Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion broker/append_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ func TestAppendBadlyBehavedClientCases(t *testing.T) {
failCtxCancel() // Cancel the stream.

_, err = stream.CloseAndRecv()
require.EqualError(t, err, `rpc error: code = Canceled desc = context canceled`)
require.Contains(t, []string{
// Error returned by appendFSM.onStreamContent, on reading an EOF
// without the expected empty AppendRequest (to commit).
`rpc error: code = Unknown desc = append stream: unexpected EOF`,
// Alternative error returned on reading the cancellation.
`rpc error: code = Canceled desc = context canceled`,
}, err.Error())

// Case: client attempts register modification but appends no data.
stream, _ = broker.client().Append(ctx)
Expand Down
25 changes: 14 additions & 11 deletions brokertest/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func TestReassignment(t *testing.T) {
require.NoError(t, bkA.WaitForConsistency(ctx, "foo/bar", &rt))
require.Equal(t, rt, pb.Route{
Members: []pb.ProcessSpec_ID{
{"zone", "broker-A"},
{"zone", "broker-B"},
{Zone: "zone", Suffix: "broker-A"},
{Zone: "zone", Suffix: "broker-B"},
},
Primary: 0,
})
Expand All @@ -135,8 +135,8 @@ func TestReassignment(t *testing.T) {
require.NoError(t, bkB.WaitForConsistency(ctx, "foo/bar", &rt))
require.Equal(t, rt, pb.Route{
Members: []pb.ProcessSpec_ID{
{"zone", "broker-B"},
{"zone", "broker-C"},
{Zone: "zone", Suffix: "broker-B"},
{Zone: "zone", Suffix: "broker-C"},
},
Primary: 0,
})
Expand All @@ -154,17 +154,11 @@ func TestGracefulStopTimeout(t *testing.T) {
var etcd = etcdtest.TestClient()
defer etcdtest.Cleanup()

// Shorten the graceful stop timeout during this test.
defer func(d time.Duration) {
server.GracefulStopTimeout = d
}(server.GracefulStopTimeout)
server.GracefulStopTimeout = time.Millisecond * 50

var ctx = pb.WithDispatchDefault(context.Background())
var bkA = NewBroker(t, etcd, "zone", "broker-A")
var bkB = NewBroker(t, etcd, "zone", "broker-B")

// Journal is exclusively owned by |bkA|.
// Journal is owned by either |bkA| or |bkB| (they race).
CreateJournals(t, bkA, Journal(pb.JournalSpec{Name: "foo/bar", Replication: 1}))

var connA, rjcA = newDialedClient(t, bkA)
Expand Down Expand Up @@ -200,6 +194,15 @@ func TestGracefulStopTimeout(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, n)

// Shorten the graceful stop timeout for the remainder of this test.
// We don't want to do this before now, because CMux connections
// use its value to SetReadTimeout() of the socket prior to its
// being matched to a mux (which has happened at this point).
defer func(d time.Duration) {
server.GracefulStopTimeout = d
}(server.GracefulStopTimeout)
server.GracefulStopTimeout = time.Millisecond * 50

// Signal the broker to exit. It will, despite the hung RPCs,
// upon hitting the graceful-stop timeout.
bkB.Signal()
Expand Down