31
31
import java .util .ArrayList ;
32
32
import java .util .HashMap ;
33
33
import java .util .Iterator ;
34
- import java .util .LinkedHashMap ;
35
34
import java .util .List ;
36
35
import java .util .Map ;
37
36
import java .util .Map .Entry ;
@@ -92,8 +91,8 @@ class MessageDispatcher {
92
91
private final LinkedBlockingQueue <AckRequestData > pendingAcks = new LinkedBlockingQueue <>();
93
92
private final LinkedBlockingQueue <AckRequestData > pendingNacks = new LinkedBlockingQueue <>();
94
93
private final LinkedBlockingQueue <AckRequestData > pendingReceipts = new LinkedBlockingQueue <>();
95
- private final LinkedHashMap <String , ReceiptCompleteData > outstandingReceipts =
96
- new LinkedHashMap <String , ReceiptCompleteData >();
94
+ private final ConcurrentMap <String , ReceiptCompleteData > outstandingReceipts =
95
+ new ConcurrentHashMap <String , ReceiptCompleteData >();
97
96
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger ();
98
97
private final AtomicBoolean extendDeadline = new AtomicBoolean (true );
99
98
private final Lock jobLock ;
@@ -411,7 +410,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
411
410
processBatch (outstandingBatch );
412
411
}
413
412
414
- synchronized void notifyAckSuccess (AckRequestData ackRequestData ) {
413
+ void notifyAckSuccess (AckRequestData ackRequestData ) {
415
414
416
415
if (outstandingReceipts .containsKey (ackRequestData .getAckId ())) {
417
416
outstandingReceipts .get (ackRequestData .getAckId ()).notifyReceiptComplete ();
@@ -437,7 +436,7 @@ synchronized void notifyAckSuccess(AckRequestData ackRequestData) {
437
436
}
438
437
}
439
438
440
- synchronized void notifyAckFailed (AckRequestData ackRequestData ) {
439
+ void notifyAckFailed (AckRequestData ackRequestData ) {
441
440
outstandingReceipts .remove (ackRequestData .getAckId ());
442
441
}
443
442
0 commit comments