Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions broker/list_apply_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,30 +219,26 @@ func (svc *Service) Apply(ctx context.Context, claims pb.Claims, req *pb.ApplyRe
return nil, err
}

// The Apply API is authorized exclusively through the "name" label.
var authorizeJournal = func(claims *pb.Claims, journal pb.Journal) error {
if !claims.Selector.Matches(pb.MustLabelSet("name", journal.String())) {
return status.Error(codes.Unauthenticated, fmt.Sprintf("not authorized to %s", journal))
}
return nil
}

var cmp []clientv3.Cmp
var ops []clientv3.Op
var s = svc.resolver.state
var scratch pb.LabelSet

for _, change := range req.Changes {
var key string

if change.Upsert != nil {
if err = authorizeJournal(&claims, change.Upsert.Name); err != nil {
return nil, err
// For Upserts, authorize against the journal's labels plus meta-labels
scratch = change.Upsert.LabelSetExt(scratch)
if !claims.Selector.Matches(scratch) {
return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("not authorized to %s", change.Upsert.Name))
}
key = allocator.ItemKey(s.KS, change.Upsert.Name.String())
ops = append(ops, clientv3.OpPut(key, change.Upsert.MarshalString()))
} else {
if err = authorizeJournal(&claims, change.Delete); err != nil {
return nil, err
// For Deletes, use name-only authorization
if !claims.Selector.Matches(pb.MustLabelSet("name", change.Delete.String())) {
return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("not authorized to %s", change.Delete))
}
key = allocator.ItemKey(s.KS, change.Delete.String())
ops = append(ops, clientv3.OpDelete(key))
Expand Down
41 changes: 41 additions & 0 deletions broker/list_apply_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,47 @@ func TestApplyCases(t *testing.T) {
})
require.ErrorContains(t, err, "not authorized to journal/B")

// Case: Upsert with labels requires claims to match all labels (not just name).
// Create a context with claims that require env=staging
var ctxRequireStagingEnv = pb.WithClaims(ctx,
pb.Claims{
Capability: pb.Capability_APPLY,
Selector: pb.MustLabelSelector("name=something/else,env=staging"),
})
var specWithLabels = pb.JournalSpec{
Name: "something/else", // This matches the name in claims
LabelSet: pb.MustLabelSet("env", "production"), // But env=production doesn't match env=staging
Replication: 1,
Fragment: fragSpec,
}
_, err = broker.client().Apply(
ctxRequireStagingEnv,
&pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{{Upsert: &specWithLabels}},
})
require.ErrorContains(t, err, "not authorized to something/else")

// Case: Upsert succeeds when claims selector matches all labels.
var ctxMatchingClaims = pb.WithClaims(ctx,
pb.Claims{
Capability: pb.Capability_APPLY,
Selector: pb.MustLabelSelector("name=something/else,env=production"),
})
require.Equal(t, pb.Status_OK,
must(broker.client().Apply(
ctxMatchingClaims,
&pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{{Upsert: &specWithLabels}},
})).Status)

// Case: Delete still only requires name to match (not other labels).
require.Equal(t, pb.Status_OK,
must(broker.client().Apply(
ctxNarrowClaims,
&pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{{Delete: "something/else", ExpectModRevision: -1}},
})).Status)

broker.cleanup()
}

Expand Down
19 changes: 8 additions & 11 deletions consumer/shard_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,18 @@ func ShardApply(ctx context.Context, claims pb.Claims, srv *Service, req *pc.App
return nil, err
}

// The Apply API is authorized exclusively through the "id" label.
var authorizeShard = func(claims *pb.Claims, shard pc.ShardID) error {
if !claims.Selector.Matches(pb.MustLabelSet("id", shard.String())) {
return status.Error(codes.Unauthenticated, fmt.Sprintf("not authorized to %s", shard))
}
return nil
}
var cmp []clientv3.Cmp
var ops []clientv3.Op
var scratch pb.LabelSet

for _, change := range req.Changes {
var key string

if change.Upsert != nil {
if err := authorizeShard(&claims, change.Upsert.Id); err != nil {
return nil, err
// For Upserts, authorize against the shard's labels plus meta-labels
scratch = change.Upsert.LabelSetExt(scratch)
if !claims.Selector.Matches(scratch) {
return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("not authorized to %s", change.Upsert.Id))
}
key = allocator.ItemKey(s.KS, change.Upsert.Id.String())
ops = append(ops, clientv3.OpPut(key, change.Upsert.MarshalString()))
Expand All @@ -136,8 +132,9 @@ func ShardApply(ctx context.Context, claims pb.Claims, srv *Service, req *pc.App
ops = append(ops, clientv3.OpPut(change.Upsert.HintPrimaryKey(), string(val)))
}
} else {
if err := authorizeShard(&claims, change.Delete); err != nil {
return nil, err
// For Deletes, use id-only authorization
if !claims.Selector.Matches(pb.MustLabelSet("id", change.Delete.String())) {
return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("not authorized to %s", change.Delete))
}
key = allocator.ItemKey(s.KS, change.Delete.String())
ops = append(ops, clientv3.OpDelete(key))
Expand Down
42 changes: 42 additions & 0 deletions consumer/shard_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumer
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
pb "go.gazette.dev/core/broker/protocol"
Expand Down Expand Up @@ -236,6 +237,47 @@ func TestAPIApplyCases(t *testing.T) {
})
require.EqualError(t, err, `rpc error: code = Unauthenticated desc = not authorized to shard-B`)

// Case: Upsert with labels requires claims to match all labels (not just id).
// Create claims that require env=staging
var claimsRequireStagingEnv = pb.Claims{
Capability: pb.Capability_APPLY,
Selector: pb.MustLabelSelector("id=shard-with-labels,env=staging"),
}
var specWithLabels = &pc.ShardSpec{
Id: "shard-with-labels", // This matches the id in claims
Sources: []pc.ShardSpec_Source{{Journal: sourceA.Name}},
RecoveryLogPrefix: aRecoveryLogPrefix,
HintPrefix: "/hints",
MaxTxnDuration: time.Second,
LabelSet: pb.MustLabelSet("env", "production"), // But env=production doesn't match env=staging
}
_, err = tf.service.Apply(ctx, claimsRequireStagingEnv, &pc.ApplyRequest{
Changes: []pc.ApplyRequest_Change{{Upsert: specWithLabels}},
})
require.EqualError(t, err, `rpc error: code = Unauthenticated desc = not authorized to shard-with-labels`)

// Case: Upsert succeeds when claims selector matches all labels.
var ctxMatchingClaims = pb.Claims{
Capability: pb.Capability_APPLY,
Selector: pb.MustLabelSelector("id=shard-with-labels,env=production"),
}
resp, err := tf.service.Apply(ctx, ctxMatchingClaims, &pc.ApplyRequest{
Changes: []pc.ApplyRequest_Change{{Upsert: specWithLabels}},
})
require.NoError(t, err)
require.Equal(t, pc.Status_OK, resp.Status)

// Case: Delete still only requires id to match (not other labels).
var ctxIdOnlyClaims = pb.Claims{
Capability: pb.Capability_APPLY,
Selector: pb.MustLabelSelector("id=shard-with-labels"),
}
resp, err = tf.service.Apply(ctx, ctxIdOnlyClaims, &pc.ApplyRequest{
Changes: []pc.ApplyRequest_Change{{Delete: "shard-with-labels", ExpectModRevision: -1}},
})
require.NoError(t, err)
require.Equal(t, pc.Status_OK, resp.Status)

// Case: Invalid requests fail with an error.
_, err = tf.service.Apply(ctx, allClaims, &pc.ApplyRequest{
Changes: []pc.ApplyRequest_Change{{Delete: "invalid shard id"}},
Expand Down