Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/gazctl/gazctlcmd/editor/editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@ import (
"regexp"

log "github.com/sirupsen/logrus"
pb "go.gazette.dev/core/broker/protocol"
pc "go.gazette.dev/core/consumer/protocol"
)

var whitespace = regexp.MustCompile(`^\s*$`)

// RetryLoopArgs is arguments of EditRetryLoop.
type RetryLoopArgs struct {
// FilePrefix
FilePrefix string
FilePrefix string
ShardClient pc.ShardClient
JournalClient pb.JournalClient
// SelectFn returns content to be displayed within the editor.
// It is invoked on each retry of the editing loop.
SelectFn func() io.Reader
SelectFn func(pc.ShardClient, pb.JournalClient) io.Reader
// ApplyFn attempts to apply the edited contents. If it returns
// an error, the editor will be re-opened with the error message.
ApplyFn func(b []byte) error
ApplyFn func(b []byte, sc pc.ShardClient, jc pb.JournalClient) error
// AbortIfUnchanged indicates the editing loop should abort if the
// editor exits without making any file changes. In this case,
// ApplyFn is not called with edited content.
Expand Down Expand Up @@ -81,7 +85,7 @@ func EditRetryLoop(args RetryLoopArgs) error {
preEdit = userEdits
} else {
writePostHeader(buf, headerLines)
if _, err := io.Copy(buf, args.SelectFn()); err != nil {
if _, err := io.Copy(buf, args.SelectFn(args.ShardClient, args.JournalClient)); err != nil {
return err
}
preEdit = pruneHeader(buf.Bytes())
Expand Down Expand Up @@ -127,7 +131,7 @@ func EditRetryLoop(args RetryLoopArgs) error {
}

// Attempt to apply edits.
if err = args.ApplyFn(userEdits); err != nil {
if err = args.ApplyFn(userEdits, args.ShardClient, args.JournalClient); err != nil {
isRetry = true
retryMsg = err.Error()
continue
Expand Down
14 changes: 10 additions & 4 deletions cmd/gazctl/gazctlcmd/journals_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
log "github.com/sirupsen/logrus"
"go.gazette.dev/core/broker/client"
"go.gazette.dev/core/broker/journalspace"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/cmd/gazctl/gazctlcmd/editor"
pc "go.gazette.dev/core/consumer/protocol"
mbp "go.gazette.dev/core/mainboilerplate"
"gopkg.in/yaml.v2"
)
Expand All @@ -23,17 +25,21 @@ func init() {

func (cmd *cmdJournalsEdit) Execute([]string) error {
startup(JournalsCfg.BaseConfig)
var ctx = context.Background()
var rjc = JournalsCfg.Broker.MustRoutedJournalClient(ctx)

return editor.EditRetryLoop(editor.RetryLoopArgs{
FilePrefix: "gazctl-journals-edit-",
JournalClient: rjc,
SelectFn: cmd.selectSpecs,
ApplyFn: cmd.applySpecs,
AbortIfUnchanged: true,
})
}

// selectSpecs returns the hoisted YAML specs of journals matching the selector.
func (cmd *cmdJournalsEdit) selectSpecs() io.Reader {
var resp = listJournals(cmd.Selector)
func (cmd *cmdJournalsEdit) selectSpecs(_ pc.ShardClient, journalClient pb.JournalClient) io.Reader {
var resp = listJournals(journalClient, cmd.Selector)

if len(resp.Journals) == 0 {
log.WithField("selector", cmd.Selector).Panic("no journals match selector")
Expand All @@ -44,7 +50,7 @@ func (cmd *cmdJournalsEdit) selectSpecs() io.Reader {
return buf
}

func (cmd *cmdJournalsEdit) applySpecs(b []byte) error {
func (cmd *cmdJournalsEdit) applySpecs(b []byte, _ pc.ShardClient, journalClient pb.JournalClient) error {
var tree journalspace.Node
if err := yaml.UnmarshalStrict(b, &tree); err != nil {
return err
Expand All @@ -59,7 +65,7 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error {
}

var ctx = context.Background()
var resp, err = client.ApplyJournalsInBatches(ctx, JournalsCfg.Broker.MustJournalClient(ctx), req, cmd.MaxTxnSize)
var resp, err = client.ApplyJournalsInBatches(ctx, journalClient, req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply journals")
log.WithField("revision", resp.Header.Etcd.Revision).Info("successfully applied")

Expand Down
8 changes: 5 additions & 3 deletions cmd/gazctl/gazctlcmd/journals_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ exactly equivalent to the original JournalSpecs.
func (cmd *cmdJournalsList) Execute([]string) error {
startup(JournalsCfg.BaseConfig)

var resp = listJournals(cmd.Selector)
var ctx = context.Background()
var rjc = JournalsCfg.Broker.MustRoutedJournalClient(ctx)
var resp = listJournals(rjc, cmd.Selector)

switch cmd.Format {
case "table":
Expand Down Expand Up @@ -135,15 +137,15 @@ func (cmd *cmdJournalsList) outputYAML(resp *pb.ListResponse) {
writeHoistedJournalSpecTree(os.Stdout, resp)
}

func listJournals(s string) *pb.ListResponse {
func listJournals(journalClient pb.JournalClient, s string) *pb.ListResponse {
var err error
var req pb.ListRequest
var ctx = context.Background()

req.Selector, err = pb.ParseLabelSelector(s)
mbp.Must(err, "failed to parse label selector", "selector", s)

resp, err := client.ListAllJournals(ctx, JournalsCfg.Broker.MustJournalClient(ctx), req)
resp, err := client.ListAllJournals(ctx, journalClient, req)
mbp.Must(err, "failed to list journals")

return resp
Expand Down
4 changes: 3 additions & 1 deletion cmd/gazctl/gazctlcmd/journals_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ See "journals list --help" for details and examples.

func (cmd *cmdJournalsPrune) Execute([]string) error {
startup(JournalsCfg.BaseConfig)
var ctx = context.Background()
var rjc = JournalsCfg.Broker.MustRoutedJournalClient(ctx)

var resp = listJournals(cmd.Selector)
var resp = listJournals(rjc, cmd.Selector)
if len(resp.Journals) == 0 {
log.WithField("selector", cmd.Selector).Panic("no journals match selector")
}
Expand Down
19 changes: 14 additions & 5 deletions cmd/gazctl/gazctlcmd/shards_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/cmd/gazctl/gazctlcmd/editor"
"go.gazette.dev/core/consumer"
pc "go.gazette.dev/core/consumer/protocol"
"go.gazette.dev/core/consumer/shardspace"
mbp "go.gazette.dev/core/mainboilerplate"
"gopkg.in/yaml.v2"
Expand All @@ -24,16 +26,23 @@ func init() {

func (cmd *cmdShardsEdit) Execute([]string) error {
startup(ShardsCfg.BaseConfig)

var ctx = context.Background()
var rsc = ShardsCfg.Consumer.MustRoutedShardClient(ctx)
var rjc = ShardsCfg.Broker.MustRoutedJournalClient(ctx)

return editor.EditRetryLoop(editor.RetryLoopArgs{
ShardClient: rsc,
JournalClient: rjc,
FilePrefix: "gazctl-shards-edit-",
SelectFn: cmd.selectSpecs,
ApplyFn: cmd.applyShardSpecYAML,
AbortIfUnchanged: true,
})
}

func (cmd *cmdShardsEdit) selectSpecs() io.Reader {
var resp = listShards(cmd.Selector)
func (cmd *cmdShardsEdit) selectSpecs(client pc.ShardClient, _ pb.JournalClient) io.Reader {
var resp = listShards(client, cmd.Selector)

var buf = &bytes.Buffer{}
if len(resp.Shards) == 0 {
Expand All @@ -44,7 +53,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader {
return buf
}

func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error {
func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte, shardClient pc.ShardClient, journalClient pb.JournalClient) error {
var set shardspace.Set
if err := yaml.UnmarshalStrict(b, &set); err != nil {
return err
Expand All @@ -55,11 +64,11 @@ func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error {

if err := req.Validate(); err != nil {
return err
} else if err = consumer.VerifyReferencedJournals(ctx, ShardsCfg.Broker.MustJournalClient(ctx), req); err != nil {
} else if err = consumer.VerifyReferencedJournals(ctx, journalClient, req); err != nil {
return errors.WithMessage(err, "verifying referenced journals")
}

var resp, err = consumer.ApplyShardsInBatches(ctx, ShardsCfg.Consumer.MustShardClient(ctx), req, cmd.MaxTxnSize)
var resp, err = consumer.ApplyShardsInBatches(ctx, shardClient, req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply shards")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
return nil
Expand Down
8 changes: 5 additions & 3 deletions cmd/gazctl/gazctlcmd/shards_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ as fetching consumption lag for a large number of shards may take a while.
func (cmd *cmdShardsList) Execute([]string) error {
startup(ShardsCfg.BaseConfig)

var resp = listShards(cmd.Selector)
var ctx = context.Background()
var rsc = ShardsCfg.Consumer.MustRoutedShardClient(ctx)

var resp = listShards(rsc, cmd.Selector)
switch cmd.Format {
case "table":
cmd.outputTable(resp)
Expand Down Expand Up @@ -139,15 +141,15 @@ func (cmd *cmdShardsList) outputTable(resp *pc.ListResponse) {
table.Render()
}

func listShards(s string) *pc.ListResponse {
func listShards(client pc.ShardClient, s string) *pc.ListResponse {
var err error
var req = new(pc.ListRequest)
var ctx = context.Background()

req.Selector, err = pb.ParseLabelSelector(s)
mbp.Must(err, "failed to parse label selector", "selector", s)

resp, err := consumer.ListShards(ctx, pc.NewShardClient(ShardsCfg.Consumer.MustDial(ctx)), req)
resp, err := consumer.ListShards(ctx, client, req)
mbp.Must(err, "failed to list shards")

return resp
Expand Down
18 changes: 10 additions & 8 deletions cmd/gazctl/gazctlcmd/shards_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
startup(ShardsCfg.BaseConfig)

var ctx = context.Background()
var rsc = ShardsCfg.Consumer.MustRoutedShardClient(ctx)
var rjc = ShardsCfg.Broker.MustRoutedJournalClient(ctx)

var metrics = shardsPruneMetrics{}
var logSegmentSets = make(map[pb.Journal]recoverylog.SegmentSet)
var skipRecoveryLogs = make(map[pb.Journal]bool)

for _, shard := range listShards(cmd.Selector).Shards {
for _, shard := range listShards(rsc, cmd.Selector).Shards {
metrics.shardsTotal++
var lastHints = fetchOldestHints(ctx, shard.Spec.Id)
var lastHints = fetchOldestHints(ctx, rsc, shard.Spec.Id)
var recoveryLog = shard.Spec.RecoveryLog()

// We require that we see hints for _all_ shards before we may make _any_ deletions.
Expand Down Expand Up @@ -82,7 +85,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
continue
}
log.WithField("journal", journal).Debug("checking fragments of journal")
for _, f := range fetchFragments(ctx, journal) {
for _, f := range fetchFragments(ctx, rjc, journal) {
var spec = f.Spec

metrics.fragmentsTotal++
Expand Down Expand Up @@ -110,12 +113,12 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
return nil
}

func fetchOldestHints(ctx context.Context, id pc.ShardID) *recoverylog.FSMHints {
func fetchOldestHints(ctx context.Context, shardClient pc.ShardClient, id pc.ShardID) *recoverylog.FSMHints {
var req = &pc.GetHintsRequest{
Shard: id,
}

var resp, err = consumer.FetchHints(ctx, ShardsCfg.Consumer.MustShardClient(ctx), req)
var resp, err = consumer.FetchHints(ctx, shardClient, req)
mbp.Must(err, "failed to fetch hints")
if resp.Status != pc.Status_OK {
err = fmt.Errorf(resp.Status.String())
Expand All @@ -131,14 +134,13 @@ func fetchOldestHints(ctx context.Context, id pc.ShardID) *recoverylog.FSMHints
return nil
}

func fetchFragments(ctx context.Context, journal pb.Journal) []pb.FragmentsResponse__Fragment {
func fetchFragments(ctx context.Context, journalClient pb.RoutedJournalClient, journal pb.Journal) []pb.FragmentsResponse__Fragment {
var err error
var req = pb.FragmentsRequest{
Journal: journal,
}
var brokerClient = JournalsCfg.Broker.MustRoutedJournalClient(ctx)

resp, err := client.ListAllFragments(ctx, brokerClient, req)
resp, err := client.ListAllFragments(ctx, journalClient, req)
mbp.Must(err, "failed to fetch fragments")

return resp.Fragments
Expand Down
6 changes: 3 additions & 3 deletions cmd/gazctl/gazctlcmd/shards_unassign.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func (cmd *cmdShardsUnassign) Execute([]string) (err error) {

var ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var rsc = ShardsCfg.Consumer.MustRoutedShardClient(ctx)

var listResp = listShards(cmd.Selector)
var listResp = listShards(rsc, cmd.Selector)
if listResp.Status != pc.Status_OK {
return fmt.Errorf("unexpected listShard status: %v", listResp.Status.String())
}
var client = pc.NewShardClient(ShardsCfg.Consumer.MustDial(ctx))

// Compute the set of shard IDs which should have assignments removed.
var shardIDs []pc.ShardID
Expand All @@ -65,7 +65,7 @@ func (cmd *cmdShardsUnassign) Execute([]string) (err error) {
chunk = 100
}

var resp, err = client.Unassign(pb.WithDispatchDefault(ctx), &pc.UnassignRequest{
var resp, err = rsc.Unassign(pb.WithDispatchDefault(ctx), &pc.UnassignRequest{
Shards: shardIDs[:chunk],
OnlyFailed: cmd.Failed,
DryRun: cmd.DryRun,
Expand Down