Skip to content

Commit dc080e0

Browse files
authored
ConcurrentModificationException when using broadcast (TooTallNate#902)
ConcurrentModificationException when using broadcast
2 parents f580296 + d634714 commit dc080e0

File tree

2 files changed

+178
-4
lines changed

2 files changed

+178
-4
lines changed

src/main/java/org/java_websocket/server/WebSocketServer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -924,13 +924,17 @@ private void doBroadcast(Object data, Collection<WebSocket> clients) {
924924
return;
925925
}
926926
Map<Draft, List<Framedata>> draftFrames = new HashMap<Draft, List<Framedata>>();
927-
for( WebSocket client : clients ) {
928-
if( client != null ) {
927+
List<WebSocket> clientCopy;
928+
synchronized (clients) {
929+
clientCopy = new ArrayList<WebSocket>(clients);
930+
}
931+
for (WebSocket client : clientCopy) {
932+
if (client != null) {
929933
Draft draft = client.getDraft();
930934
fillFrames(draft, draftFrames, sData, bData);
931935
try {
932-
client.sendFrame( draftFrames.get( draft ) );
933-
} catch ( WebsocketNotConnectedException e ) {
936+
client.sendFrame(draftFrames.get(draft));
937+
} catch (WebsocketNotConnectedException e) {
934938
//Ignore this exception in this case
935939
}
936940
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright (c) 2010-2019 Nathan Rajlich
3+
*
4+
* Permission is hereby granted, free of charge, to any person
5+
* obtaining a copy of this software and associated documentation
6+
* files (the "Software"), to deal in the Software without
7+
* restriction, including without limitation the rights to use,
8+
* copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
* copies of the Software, and to permit persons to whom the
10+
* Software is furnished to do so, subject to the following
11+
* conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be
14+
* included in all copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17+
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
18+
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19+
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20+
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
21+
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23+
* OTHER DEALINGS IN THE SOFTWARE.
24+
*
25+
*/
26+
27+
package org.java_websocket.issues;
28+
29+
import org.java_websocket.WebSocket;
30+
import org.java_websocket.client.WebSocketClient;
31+
import org.java_websocket.handshake.ClientHandshake;
32+
import org.java_websocket.handshake.ServerHandshake;
33+
import org.java_websocket.server.WebSocketServer;
34+
import org.java_websocket.util.SocketUtil;
35+
import org.junit.Test;
36+
import org.junit.runner.RunWith;
37+
import org.junit.runners.Parameterized;
38+
39+
import java.io.IOException;
40+
import java.net.BindException;
41+
import java.net.InetSocketAddress;
42+
import java.net.URI;
43+
import java.net.URISyntaxException;
44+
import java.nio.ByteBuffer;
45+
import java.util.ArrayList;
46+
import java.util.Collection;
47+
import java.util.ConcurrentModificationException;
48+
import java.util.List;
49+
import java.util.concurrent.CountDownLatch;
50+
51+
import static org.junit.Assert.assertFalse;
52+
53+
@RunWith(Parameterized.class)
54+
public class Issue879Test {
55+
56+
private static final int NUMBER_OF_TESTS = 20;
57+
58+
@Parameterized.Parameter
59+
public int numberOfConnections;
60+
61+
62+
@Test(timeout= 10000)
63+
public void QuickStopTest() throws IOException, InterruptedException, URISyntaxException {
64+
final boolean[] wasBindException = {false};
65+
final boolean[] wasConcurrentException = new boolean[1];
66+
final CountDownLatch countDownLatch = new CountDownLatch(1);
67+
68+
class SimpleServer extends WebSocketServer {
69+
public SimpleServer(InetSocketAddress address) {
70+
super(address);
71+
}
72+
73+
@Override
74+
public void onOpen(WebSocket conn, ClientHandshake handshake) {
75+
broadcast("new connection: " + handshake.getResourceDescriptor()); //This method sends a message to all clients connected
76+
}
77+
78+
@Override
79+
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
80+
}
81+
82+
@Override
83+
public void onMessage(WebSocket conn, String message) {
84+
}
85+
86+
@Override
87+
public void onMessage(WebSocket conn, ByteBuffer message) {
88+
}
89+
90+
@Override
91+
public void onError(WebSocket conn, Exception ex) {
92+
if (ex instanceof BindException) {
93+
wasBindException[0] = true;
94+
}
95+
if (ex instanceof ConcurrentModificationException) {
96+
wasConcurrentException[0] = true;
97+
}
98+
}
99+
100+
@Override
101+
public void onStart() {
102+
countDownLatch.countDown();
103+
}
104+
}
105+
int port = SocketUtil.getAvailablePort();
106+
SimpleServer serverA = new SimpleServer(new InetSocketAddress( port));
107+
SimpleServer serverB = new SimpleServer(new InetSocketAddress( port));
108+
serverA.start();
109+
countDownLatch.await();
110+
List<WebSocketClient> clients = startNewConnections(numberOfConnections, port);
111+
Thread.sleep(100);
112+
int numberOfConnected = 0;
113+
for (WebSocketClient client : clients) {
114+
if (client.isOpen())
115+
numberOfConnected++;
116+
}
117+
// Number will differ since we use connect instead of connectBlocking
118+
// System.out.println(numberOfConnected + " " + numberOfConnections);
119+
120+
serverA.stop();
121+
serverB.start();
122+
clients.clear();
123+
assertFalse("There was a BindException", wasBindException[0]);
124+
assertFalse("There was a ConcurrentModificationException", wasConcurrentException[0]);
125+
}
126+
127+
@Parameterized.Parameters
128+
public static Collection<Integer[]> data() {
129+
List<Integer[]> ret = new ArrayList<Integer[]>(NUMBER_OF_TESTS);
130+
for (int i = 0; i < NUMBER_OF_TESTS; i++) ret.add(new Integer[]{25+ i*25});
131+
return ret;
132+
}
133+
134+
private List<WebSocketClient> startNewConnections(int numberOfConnections, int port) throws URISyntaxException, InterruptedException {
135+
List<WebSocketClient> clients = new ArrayList<WebSocketClient>(numberOfConnections);
136+
for (int i = 0; i < numberOfConnections; i++) {
137+
WebSocketClient client = new SimpleClient(new URI("ws://localhost:" + port));
138+
client.connect();
139+
Thread.sleep(1);
140+
clients.add(client);
141+
}
142+
return clients;
143+
}
144+
class SimpleClient extends WebSocketClient {
145+
146+
public SimpleClient(URI serverUri) {
147+
super(serverUri);
148+
}
149+
150+
@Override
151+
public void onOpen(ServerHandshake handshakedata) {
152+
153+
}
154+
155+
@Override
156+
public void onMessage(String message) {
157+
158+
}
159+
160+
@Override
161+
public void onClose(int code, String reason, boolean remote) {
162+
163+
}
164+
165+
@Override
166+
public void onError(Exception ex) {
167+
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)