Skip to content

Commit 679bef4

Browse files
committed
Merge pull request apache#660 from datastax/JAVA-1179
JAVA-1179: Request objects should be copied when executed.
2 parents 4f9f4ca + 84daab4 commit 679bef4

File tree

6 files changed

+226
-12
lines changed

6 files changed

+226
-12
lines changed

changelog/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
## Changelog
22

3+
### 2.0.12.2 (in progress)
4+
5+
- [bug] JAVA-1179: Request objects should be copied when executed.
6+
7+
38
### 2.0.12.1
49

510
- [bug] JAVA-994: Don't call on(Up|Down|Add|Remove) methods if Cluster is closed/closing.

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,11 +502,10 @@ public ResponseHandler write(ResponseCallback callback) throws ConnectionExcepti
502502

503503
public ResponseHandler write(ResponseCallback callback, boolean startTimeout) throws ConnectionException, BusyConnectionException {
504504

505-
Message.Request request = callback.request();
506-
507505
ResponseHandler handler = new ResponseHandler(this, callback);
508506
dispatcher.add(handler);
509-
request.setStreamId(handler.streamId);
507+
508+
Message.Request request = callback.request().setStreamId(handler.streamId);
510509

511510
/*
512511
* We check for close/defunct *after* having set the handler because closing/defuncting

driver-core/src/main/java/com/datastax/driver/core/Message.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface Decoder<R extends Response> {
4646
public R decode(ByteBuf body);
4747
}
4848

49-
private volatile int streamId;
49+
private volatile int streamId = -1;
5050

5151
protected Message() {
5252
}
@@ -101,6 +101,19 @@ protected Request(Type type, boolean tracingRequested) {
101101
this.tracingRequested = tracingRequested;
102102
}
103103

104+
@Override
105+
public Request setStreamId(int streamId) {
106+
// JAVA-1179: defensively guard against reusing the same Request object twice.
107+
// If no streamId was ever set we can use this object directly, otherwise make a copy.
108+
if (getStreamId() < 0)
109+
return (Request) super.setStreamId(streamId);
110+
else {
111+
Request copy = this.copy();
112+
copy.setStreamId(streamId);
113+
return copy;
114+
}
115+
}
116+
104117
public boolean isTracingRequested() {
105118
return tracingRequested;
106119
}
@@ -140,9 +153,7 @@ ByteBuffer pagingState() {
140153
}
141154
}
142155

143-
Request copy() {
144-
throw new UnsupportedOperationException();
145-
}
156+
abstract Request copy();
146157

147158
Request copy(ConsistencyLevel newConsistencyLevel) {
148159
throw new UnsupportedOperationException();

driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class RequestHandler {
6464
private final long startTime;
6565

6666
private final AtomicBoolean isDone = new AtomicBoolean();
67-
private AtomicInteger executionCount = new AtomicInteger();
67+
private final AtomicInteger executionCount = new AtomicInteger();
6868

6969
public RequestHandler(SessionManager manager, Callback callback, Statement statement) {
7070
this.id = Long.toString(System.identityHashCode(this));
@@ -106,10 +106,6 @@ private void startNewExecution() {
106106

107107
Message.Request request = callback.request();
108108
int position = executionCount.incrementAndGet();
109-
// Clone the request after the first execution, since we set the streamId on it later and we
110-
// don't want to share that across executions.
111-
if (position > 1)
112-
request = request.copy();
113109

114110
SpeculativeExecution execution = new SpeculativeExecution(request, position);
115111
runningExecutions.add(execution);

driver-core/src/main/java/com/datastax/driver/core/Requests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ public int encodedSize(Startup msg) {
4646
};
4747

4848
private final Map<String, String> options;
49+
private final ProtocolOptions.Compression compression;
4950

5051
public Startup(ProtocolOptions.Compression compression) {
5152
super(Message.Request.Type.STARTUP);
53+
this.compression = compression;
5254

5355
ImmutableMap.Builder<String, String> map = new ImmutableMap.Builder<String, String>();
5456
map.put(CQL_VERSION_OPTION, CQL_VERSION);
@@ -57,6 +59,11 @@ public Startup(ProtocolOptions.Compression compression) {
5759
this.options = map.build();
5860
}
5961

62+
@Override
63+
Request copy() {
64+
return new Startup(compression);
65+
}
66+
6067
@Override
6168
public String toString() {
6269
return "STARTUP " + options;
@@ -83,6 +90,11 @@ public Credentials(Map<String, String> credentials) {
8390
super(Message.Request.Type.CREDENTIALS);
8491
this.credentials = credentials;
8592
}
93+
94+
@Override
95+
Request copy() {
96+
return new Credentials(credentials);
97+
}
8698
}
8799

88100
public static class Options extends Message.Request {
@@ -100,6 +112,11 @@ public Options() {
100112
super(Message.Request.Type.OPTIONS);
101113
}
102114

115+
@Override
116+
Request copy() {
117+
return new Options();
118+
}
119+
103120
@Override
104121
public String toString() {
105122
return "OPTIONS";
@@ -431,6 +448,11 @@ public Prepare(String query) {
431448
this.query = query;
432449
}
433450

451+
@Override
452+
Request copy() {
453+
return new Prepare(query);
454+
}
455+
434456
@Override
435457
public String toString() {
436458
return "PREPARE " + query;
@@ -461,6 +483,11 @@ public Register(List<ProtocolEvent.Type> eventTypes) {
461483
this.eventTypes = eventTypes;
462484
}
463485

486+
@Override
487+
Request copy() {
488+
return new Register(eventTypes);
489+
}
490+
464491
@Override
465492
public String toString() {
466493
return "REGISTER " + eventTypes;
@@ -486,5 +513,10 @@ public AuthResponse(byte[] token) {
486513
super(Message.Request.Type.AUTH_RESPONSE);
487514
this.token = token;
488515
}
516+
517+
@Override
518+
Request copy() {
519+
return new AuthResponse(token);
520+
}
489521
}
490522
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (C) 2012-2015 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.driver.core;
17+
18+
import com.datastax.driver.core.exceptions.NoHostAvailableException;
19+
import com.google.common.util.concurrent.FutureCallback;
20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.Uninterruptibles;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import org.testng.annotations.AfterClass;
25+
import org.testng.annotations.BeforeClass;
26+
import org.testng.annotations.Test;
27+
28+
import java.util.List;
29+
import java.util.Random;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.Semaphore;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.testng.Assert.fail;
37+
38+
public class ReusedStreamIdTest {
39+
40+
static Logger logger = LoggerFactory.getLogger(ReusedStreamIdTest.class);
41+
42+
private CCMBridge ccm;
43+
44+
@BeforeClass(groups = "long")
45+
public void setupCluster() throws InterruptedException {
46+
ccm = CCMBridge.builder("test")
47+
.withNodes(2)
48+
.build();
49+
}
50+
51+
@AfterClass(groups = "long")
52+
public void shutdownCluster() {
53+
if (ccm != null)
54+
ccm.stop();
55+
}
56+
57+
/**
58+
* Ensures that if activity tied to future completion blocks netty io threads that this does not cause the
59+
* driver to possibly invoke the wrong handler for a response, as described in JAVA-1179.
60+
* <p>
61+
* This is accomplished by setting up a 2 node cluster and setting a low read timeout (1 second). The test
62+
* submits 10 concurrent requests repeatedly for up to 500 queries and on completion of each request may block
63+
* in a callback randomly (25% of the time) for 1 second, causing a retry on the next host to trigger. If a new
64+
* stream id is not allocated on the retry, its possible it could use an already used stream id and cause the driver
65+
* to invoke the wrong handlers for a response. The test checks for this by ensuring that the column returned in a
66+
* response matches the one queried. If the column received does not match, the test fails. In cases where this
67+
* bug is present, it should be detected within 10 seconds.
68+
*
69+
* @jira_ticket JAVA-1179
70+
* @test_category queries:async_callback
71+
*/
72+
@Test(groups = "long")
73+
public void should_not_receive_wrong_response_when_callbacks_block_io_thread() {
74+
// Low Read Timeout to trigger retry behavior.
75+
Cluster cluster = Cluster.builder()
76+
.addContactPoint(CCMBridge.IP_PREFIX + '1')
77+
.withSocketOptions(new SocketOptions().setReadTimeoutMillis(1000))
78+
.build();
79+
80+
int concurrency = 10;
81+
final Semaphore semaphore = new Semaphore(concurrency);
82+
// RNG to determine sleep times.
83+
final Random random = new Random();
84+
85+
try {
86+
Session session = cluster.connect();
87+
88+
// Use the system.local table and alternate between columns that are queried.
89+
List<ColumnMetadata> columnsToGrab = cluster.getMetadata().getKeyspace("system").getTable("local").getColumns();
90+
assertThat(columnsToGrab.size()).isGreaterThan(1);
91+
92+
final CountDownLatch errorTrigger = new CountDownLatch(1);
93+
94+
long start = System.currentTimeMillis();
95+
// 500 iterations will take roughly 1 minute.
96+
int iterations = 500;
97+
final AtomicInteger completed = new AtomicInteger(0);
98+
99+
for (int i = 1; i <= iterations; i++) {
100+
try {
101+
if (errorTrigger.getCount() == 0) {
102+
fail(String.format("Error triggered at or before %d of %d requests after %dms.", i, iterations,
103+
System.currentTimeMillis() - start));
104+
}
105+
semaphore.acquire();
106+
final String column = columnsToGrab.get(i % columnsToGrab.size()).getName();
107+
String query = String.format("select %s from system.local", column);
108+
ResultSetFuture future = session.executeAsync(query);
109+
110+
Futures.addCallback(future, new FutureCallback<ResultSet>() {
111+
@Override
112+
public void onSuccess(ResultSet result) {
113+
semaphore.release();
114+
// Expect the column that you queried to be present, if its not we got the wrong response
115+
// back.
116+
int columnIndex = result.getColumnDefinitions().getIndexOf(column);
117+
if (columnIndex == -1) {
118+
logger.error("Got response without column {}, got columns {} from Host {}.", column,
119+
result.getColumnDefinitions(), result.getExecutionInfo().getQueriedHost());
120+
errorTrigger.countDown();
121+
return;
122+
}
123+
completed.incrementAndGet();
124+
// Block netty io thread 25% of the time.
125+
int num = random.nextInt(1);
126+
if (num == 0) {
127+
// Sleep exactly one second, should trigger retry.
128+
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
129+
}
130+
}
131+
132+
@Override
133+
public void onFailure(Throwable t) {
134+
semaphore.release();
135+
// NHAEs are inevitable because of low query timeouts and blocked threads.
136+
if (!(t instanceof NoHostAvailableException)) {
137+
logger.error("Unexpected error encountered.", t);
138+
errorTrigger.countDown();
139+
}
140+
}
141+
});
142+
} catch (InterruptedException e) {
143+
fail("Test interrupted", e);
144+
}
145+
if (i % (iterations / 10) == 0) {
146+
logger.info("Submitted {} of {} requests. ({} completed successfully)", i, iterations, completed.get());
147+
}
148+
}
149+
150+
// Wait for 10 seconds for any remaining requests to possibly trigger an error, its likely
151+
// that if we get to this point this will not happen.
152+
Uninterruptibles.awaitUninterruptibly(errorTrigger, 10, TimeUnit.SECONDS);
153+
if (errorTrigger.getCount() == 0) {
154+
fail(String.format("Error triggered after %dms.", System.currentTimeMillis() - start));
155+
}
156+
// Sanity check to ensure that at least some requests succeeded, we expect some failures if both
157+
// hosts timeout as its likely they could be blocked on the event loop.
158+
assertThat(completed.get()).isGreaterThan(0);
159+
} finally {
160+
try {
161+
// Acquire all permits to make sure all inflight requests complete.
162+
if (!semaphore.tryAcquire(concurrency, 10, TimeUnit.SECONDS)) {
163+
fail("Could not acquire all permits within 10 seconds of completion.");
164+
}
165+
} catch (InterruptedException e) {
166+
fail("Interrupted.", e);
167+
}
168+
cluster.close();
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)