diff --git a/broker/append_api_test.go b/broker/append_api_test.go index 8ca0490b..26167590 100644 --- a/broker/append_api_test.go +++ b/broker/append_api_test.go @@ -165,7 +165,13 @@ func TestAppendBadlyBehavedClientCases(t *testing.T) { failCtxCancel() // Cancel the stream. _, err = stream.CloseAndRecv() - require.EqualError(t, err, `rpc error: code = Canceled desc = context canceled`) + require.Contains(t, []string{ + // Error returned by appendFSM.onStreamContent, on reading an EOF + // without the expected empty AppendRequest (to commit). + `rpc error: code = Unknown desc = append stream: unexpected EOF`, + // Alternative error returned on reading the cancellation. + `rpc error: code = Canceled desc = context canceled`, + }, err.Error()) // Case: client attempts register modification but appends no data. stream, _ = broker.client().Append(ctx) diff --git a/brokertest/broker_test.go b/brokertest/broker_test.go index a22b7bb5..5b5622cf 100644 --- a/brokertest/broker_test.go +++ b/brokertest/broker_test.go @@ -119,8 +119,8 @@ func TestReassignment(t *testing.T) { require.NoError(t, bkA.WaitForConsistency(ctx, "foo/bar", &rt)) require.Equal(t, rt, pb.Route{ Members: []pb.ProcessSpec_ID{ - {"zone", "broker-A"}, - {"zone", "broker-B"}, + {Zone: "zone", Suffix: "broker-A"}, + {Zone: "zone", Suffix: "broker-B"}, }, Primary: 0, }) @@ -135,8 +135,8 @@ func TestReassignment(t *testing.T) { require.NoError(t, bkB.WaitForConsistency(ctx, "foo/bar", &rt)) require.Equal(t, rt, pb.Route{ Members: []pb.ProcessSpec_ID{ - {"zone", "broker-B"}, - {"zone", "broker-C"}, + {Zone: "zone", Suffix: "broker-B"}, + {Zone: "zone", Suffix: "broker-C"}, }, Primary: 0, }) @@ -154,17 +154,11 @@ func TestGracefulStopTimeout(t *testing.T) { var etcd = etcdtest.TestClient() defer etcdtest.Cleanup() - // Shorten the graceful stop timeout during this test. - defer func(d time.Duration) { - server.GracefulStopTimeout = d - }(server.GracefulStopTimeout) - server.GracefulStopTimeout = time.Millisecond * 50 - var ctx = pb.WithDispatchDefault(context.Background()) var bkA = NewBroker(t, etcd, "zone", "broker-A") var bkB = NewBroker(t, etcd, "zone", "broker-B") - // Journal is exclusively owned by |bkA|. + // Journal is owned by either |bkA| or |bkB| (they race). CreateJournals(t, bkA, Journal(pb.JournalSpec{Name: "foo/bar", Replication: 1})) var connA, rjcA = newDialedClient(t, bkA) @@ -200,6 +194,15 @@ func TestGracefulStopTimeout(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, n) + // Shorten the graceful stop timeout for the remainder of this test. + // We don't want to do this before now, because CMux connections + // use its value to SetReadTimeout() of the socket prior to its + // being matched to a mux (which has happened at this point). + defer func(d time.Duration) { + server.GracefulStopTimeout = d + }(server.GracefulStopTimeout) + server.GracefulStopTimeout = time.Millisecond * 50 + // Signal the broker to exit. It will, despite the hung RPCs, // upon hitting the graceful-stop timeout. bkB.Signal()