@@ -706,6 +706,9 @@ public void testWithOnlyMinSessionTimeout() throws Exception {
706
706
assertEquals (maxSessionTimeOut , quorumPeer .getMaxSessionTimeout (), "maximumSessionTimeOut is wrong" );
707
707
}
708
708
709
+ /**
710
+ * Verify that failed txn in isolated leader got truncated after rejoining quorum.
711
+ */
709
712
@ Test
710
713
public void testFailedTxnAsPartOfQuorumLoss () throws Exception {
711
714
final int LEADER_TIMEOUT_MS = 10_000 ;
@@ -729,6 +732,8 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
729
732
// increase the tick time to delay the leader going to looking
730
733
int previousTick = servers .mt [leader ].main .quorumPeer .tickTime ;
731
734
servers .mt [leader ].main .quorumPeer .tickTime = LEADER_TIMEOUT_MS ;
735
+ // isolate it from other quorum members by prevent it from rejoining
736
+ servers .mt [leader ].getQuorumPeer ().setSuspended (true );
732
737
// let the previous tick on the leader exhaust itself so the new tick time takes effect
733
738
Thread .sleep (previousTick );
734
739
LOG .warn ("LEADER {}" , leader );
@@ -739,34 +744,18 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
739
744
}
740
745
}
741
746
742
- // 3. start up the followers to form a new quorum
743
- for (int i = 0 ; i < SERVER_COUNT ; i ++) {
744
- if (i != leader ) {
745
- servers .mt [i ].start ();
746
- }
747
- }
748
-
749
- // 4. wait one of the follower to be the new leader
750
- for (int i = 0 ; i < SERVER_COUNT ; i ++) {
751
- if (i != leader ) {
752
- // Recreate a client session since the previous session was not persisted.
753
- servers .restartClient (i , this );
754
- waitForOne (servers .zk [i ], States .CONNECTED );
755
- }
756
- }
757
-
758
- // 5. send a create request to old leader and make sure it's synced to disk,
747
+ // 3. send a create request to old leader and make sure it's synced to disk,
759
748
// which means it acked from itself
760
749
try {
761
750
servers .zk [leader ].create ("/zk" + leader , "zk" .getBytes (), Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT );
762
751
fail ("create /zk" + leader + " should have failed" );
763
- } catch (KeeperException e ) {
752
+ } catch (KeeperException ignored ) {
764
753
}
765
754
766
- // just make sure that we actually did get it in process at the
767
- // leader
755
+ // just make sure that we actually did get it in process at the leader
756
+ //
768
757
// there can be extra sessionClose proposals
769
- assertTrue (outstanding .size () > 0 );
758
+ assertFalse (outstanding .isEmpty () );
770
759
Proposal p = findProposalOfType (outstanding , OpCode .create );
771
760
LOG .info ("Old leader id: {}. All proposals: {}" , leader , outstanding );
772
761
assertNotNull (p , "Old leader doesn't have 'create' proposal" );
@@ -782,36 +771,73 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
782
771
sleepTime += 100 ;
783
772
}
784
773
785
- // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
786
- LOG .info ("Waiting for leader {} to timeout followers" , leader );
774
+ // 4. start up the followers to form a new quorum
775
+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
776
+ if (i != leader ) {
777
+ servers .mt [i ].start ();
778
+ }
779
+ }
780
+
781
+ // 5. wait one of the follower to be the new leader
782
+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
783
+ if (i != leader ) {
784
+ // Recreate a new client session to avoid ConnectionLoss as connecting server is restarted.
785
+ servers .restartClient (i , this );
786
+ waitForOne (servers .zk [i ], States .CONNECTED );
787
+ }
788
+ }
789
+
790
+ // 6. make sure new quorum does not replicate the failed txn
791
+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
792
+ if (i == leader ) {
793
+ continue ;
794
+ }
795
+ assertNull (servers .zk [i ].exists ("/zk" + leader , false ),
796
+ "server " + i + " should not have /zk" + leader );
797
+ }
798
+
799
+ // resume election to rejoin the cluster
800
+ servers .mt [leader ].getQuorumPeer ().setSuspended (false );
801
+
802
+ // 7. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
803
+ LOG .info ("Waiting for leader {} to timeout and rejoin as follower" , leader );
787
804
sleepTime = 0 ;
788
- Follower f = servers .mt [leader ].main .quorumPeer .follower ;
789
- while (f == null || !f .isRunning ()) {
790
- if (sleepTime > LEADER_TIMEOUT_MS * 2 ) {
791
- fail ("Took too long for old leader to time out "
805
+ while (servers .mt [leader ].getQuorumPeer ().getPeerState () != QuorumPeer .ServerState .FOLLOWING ) {
806
+ if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2 ) {
807
+ fail ("Took too long for old leader to time out and rejoin "
792
808
+ servers .mt [leader ].main .quorumPeer .getPeerState ());
793
809
}
794
810
Thread .sleep (100 );
795
811
sleepTime += 100 ;
796
- f = servers .mt [leader ].main .quorumPeer .follower ;
797
812
}
798
813
799
814
int newLeader = servers .findLeader ();
800
815
// make sure a different leader was elected
801
816
assertNotEquals (leader , newLeader );
802
817
803
- // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state
804
- servers .mt [leader ].shutdown ();
805
- servers .mt [leader ].start ();
806
- // old client session can expire, restart it
818
+ // Now, all preconditions meet. Let's verify that the failed txn got truncated in whole cluster.
819
+
820
+ boolean restarted = false ;
807
821
servers .restartClient (leader , this );
808
- waitForAll (servers , States .CONNECTED );
822
+ waitForOne (servers .zk [leader ], States .CONNECTED );
823
+ while (true ) {
824
+ // 7. make sure everything is consistent, that is the failed txn got truncated in old leader.
825
+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
826
+ assertNull (servers .zk [i ].exists ("/zk" + leader , false ),
827
+ "server " + i + " should not have /zk" + leader );
828
+ }
809
829
810
- // 8. check the node exist in previous leader but not others
811
- // make sure everything is consistent
812
- for (int i = 0 ; i < SERVER_COUNT ; i ++) {
813
- assertNull (servers .zk [i ].exists ("/zk" + leader , false ),
814
- "server " + i + " should not have /zk" + leader );
830
+ if (restarted ) {
831
+ break ;
832
+ }
833
+
834
+ // 8. make sure above holds after restart
835
+ servers .mt [leader ].shutdown ();
836
+ servers .mt [leader ].start ();
837
+ // old client session can expire, restart it
838
+ servers .restartClient (leader , this );
839
+ waitForAll (servers , States .CONNECTED );
840
+ restarted = true ;
815
841
}
816
842
}
817
843
0 commit comments