Skip to content

Commit 4c66f20

Browse files
committed
JAVA-1182: Throw error when synchronous call made on I/O thread.
1 parent 679bef4 commit 4c66f20

File tree

6 files changed

+59
-4
lines changed

6 files changed

+59
-4
lines changed

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 2.0.12.2 (in progress)
44

55
- [bug] JAVA-1179: Request objects should be copied when executed.
6+
- [improvement] JAVA-1182: Throw error when synchronous call made on I/O thread.
67

78

89
### 2.0.12.1
@@ -11,6 +12,7 @@
1112
- [improvement] JAVA-805: Document that metrics are null until Cluster is initialized.
1213
- [bug] JAVA-1072: Ensure defunct connections are properly evicted from the pool.
1314

15+
1416
### 2.0.12
1517

1618
- [bug] JAVA-950: Fix Cluster.connect with a case-sensitive keyspace.

driver-core/src/main/java/com/datastax/driver/core/AbstractSession.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public ResultSet execute(String query, Object... values) {
5151
*/
5252
@Override
5353
public ResultSet execute(Statement statement) {
54+
checkNotInEventLoop();
5455
return executeAsync(statement).getUninterruptibly();
5556
}
5657

@@ -75,6 +76,7 @@ public ResultSetFuture executeAsync(String query, Object... values) {
7576
*/
7677
@Override
7778
public PreparedStatement prepare(String query) {
79+
checkNotInEventLoop();
7880
try {
7981
return Uninterruptibles.getUninterruptibly(prepareAsync(query));
8082
} catch (ExecutionException e) {
@@ -87,6 +89,7 @@ public PreparedStatement prepare(String query) {
8789
*/
8890
@Override
8991
public PreparedStatement prepare(RegularStatement statement) {
92+
checkNotInEventLoop();
9093
try {
9194
return Uninterruptibles.getUninterruptibly(prepareAsync(statement));
9295
} catch (ExecutionException e) {
@@ -132,4 +135,13 @@ public void close() {
132135
Thread.currentThread().interrupt();
133136
}
134137
}
138+
139+
/**
140+
* Checks that the current thread is not one of the Netty I/O threads used by the driver.
141+
* <p/>
142+
* This is called from the synchronous methods of this class to prevent deadlock issues.
143+
*/
144+
protected void checkNotInEventLoop() {
145+
// This method is concrete only to avoid a breaking change. See subclass for the actual implementation.
146+
}
135147
}

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ public static class Factory {
679679

680680
public final Timer timer;
681681

682-
private final EventLoopGroup eventLoopGroup;
682+
final EventLoopGroup eventLoopGroup;
683683
private final Class<? extends Channel> channelClass;
684684

685685
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

driver-core/src/main/java/com/datastax/driver/core/SessionManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.common.collect.ImmutableList;
2828
import com.google.common.collect.Lists;
2929
import com.google.common.util.concurrent.*;
30+
import io.netty.util.concurrent.EventExecutor;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

@@ -46,6 +47,9 @@ class SessionManager extends AbstractSession {
4647

4748
private static final Logger logger = LoggerFactory.getLogger(Session.class);
4849

50+
private static final boolean CHECK_IO_DEADLOCKS = SystemProperties.getBoolean(
51+
"com.datastax.driver.CHECK_IO_DEADLOCKS", true);
52+
4953
final Cluster cluster;
5054
final ConcurrentMap<Host, HostConnectionPool> pools;
5155
final HostConnectionPool.PoolState poolsState;
@@ -611,6 +615,21 @@ void cleanupIdleConnections(long now) {
611615
}
612616
}
613617

618+
@Override
619+
protected void checkNotInEventLoop() {
620+
if (!CHECK_IO_DEADLOCKS)
621+
return;
622+
for (EventExecutor executor : cluster.manager.connectionFactory.eventLoopGroup) {
623+
if (executor.inEventLoop()) {
624+
throw new IllegalStateException(
625+
"Detected a synchronous Session call (execute() or prepare()) on an I/O thread, " +
626+
"this can cause deadlocks or unpredictable behavior. " +
627+
"Make sure your Future callbacks only use async calls, or schedule them on a " +
628+
"different executor.");
629+
}
630+
}
631+
}
632+
614633
private static class State implements Session.State {
615634

616635
private final SessionManager session;

driver-core/src/main/java/com/datastax/driver/core/SystemProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
/**
2222
* Allows overriding internal settings via system properties.
2323
* <p/>
24-
* Warning: this is meant for integration tests only, NOT FOR PRODUCTION USE.
24+
* This is generally reserved for tests or "expert" usage.
2525
*/
2626
class SystemProperties {
2727
private static final Logger logger = LoggerFactory.getLogger(SystemProperties.class);
@@ -34,7 +34,7 @@ static int getInt(String key, int defaultValue) {
3434
}
3535
try {
3636
int value = Integer.parseInt(stringValue);
37-
logger.warn("{} is defined, using value {}", key, value);
37+
logger.info("{} is defined, using value {}", key, value);
3838
return value;
3939
} catch (NumberFormatException e) {
4040
logger.warn("{} is defined but could not parse value {}, using default value {}", key, stringValue, defaultValue);
@@ -50,7 +50,7 @@ static boolean getBoolean(String key, boolean defaultValue) {
5050
}
5151
try {
5252
boolean value = Boolean.parseBoolean(stringValue);
53-
logger.warn("{} is defined, using value {}", key, value);
53+
logger.info("{} is defined, using value {}", key, value);
5454
return value;
5555
} catch (NumberFormatException e) {
5656
logger.warn("{} is defined but could not parse value {}, using default value {}", key, stringValue, defaultValue);

driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.datastax.driver.core.exceptions.InvalidQueryException;
1919
import com.google.common.base.Function;
20+
import com.google.common.base.Throwables;
2021
import com.google.common.collect.Lists;
2122
import com.google.common.util.concurrent.*;
2223
import org.testng.annotations.DataProvider;
@@ -27,6 +28,7 @@
2728
import java.util.concurrent.*;
2829

2930
import static org.assertj.core.api.Assertions.assertThat;
31+
import static org.assertj.core.api.Assertions.fail;
3032
import static org.testng.Assert.assertEquals;
3133
import static org.testng.Assert.assertTrue;
3234

@@ -129,6 +131,26 @@ public void should_propagate_error_to_chained_query_if_session_init_fails() thro
129131
}
130132
}
131133

134+
@Test(groups = "short")
135+
public void should_fail_when_synchronous_call_on_io_thread() throws Exception {
136+
ResultSetFuture f = session.executeAsync("select release_version from system.local");
137+
ListenableFuture<Void> f2 = Futures.transform(f, new Function<ResultSet, Void>() {
138+
@Override
139+
public Void apply(ResultSet input) {
140+
session.execute("select release_version from system.local");
141+
return null;
142+
}
143+
});
144+
try {
145+
f2.get();
146+
fail("Expected a failed future");
147+
} catch (Exception e) {
148+
assertThat(Throwables.getRootCause(e))
149+
.isInstanceOf(IllegalStateException.class)
150+
.hasMessageContaining("Detected a synchronous Session call");
151+
}
152+
}
153+
132154
private ListenableFuture<Integer> connectAndQuery(String keyspace, Executor executor) {
133155
ListenableFuture<Session> sessionFuture = cluster.connectAsync(keyspace);
134156
ListenableFuture<ResultSet> queryFuture = Futures.transform(sessionFuture, new AsyncFunction<Session, ResultSet>() {

0 commit comments

Comments
 (0)