Skip to content

Commit 2c17fbb

Browse files
daschlMichael Nitschinger
authored andcommitted
JCBC-1642: ClusterManager must use resolved SeedNodes
Previously the cluster manager would use the seed nodes directly from the connection string, but this is a problem because if DNS SRV is used it would try to connect to the dns host instead of the resolved hosts. This changeset updates the code so that the potentially resolved seed hosts are used instead of the original one from the connection string. Note that this is still not 100% accurate since the cluster topology might drift over time, but it fixes the issue at hand. Further improvements can be maded alongside if neeeded in a later changeset. Change-Id: Ie96186d72b01c783172544433a4c456be15ced95 Reviewed-on: http://review.couchbase.org/c/couchbase-java-client/+/129906 Reviewed-by: Graham Pople <grahampople@gmail.com> Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
1 parent 29c85bd commit 2c17fbb

File tree

4 files changed

+59
-30
lines changed

4 files changed

+59
-30
lines changed

src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ public class CouchbaseAsyncCluster implements AsyncCluster {
137137

138138
private final ClusterFacade core;
139139
private final CouchbaseEnvironment environment;
140-
private final ConnectionString connectionString;
141140
private final Map<String, AsyncBucket> bucketCache;
142141
private final boolean sharedEnvironment;
142+
private final List<String> seedNodes;
143143
private Authenticator authenticator;
144144

145145
/**
@@ -271,14 +271,12 @@ public static CouchbaseAsyncCluster fromConnectionString(final CouchbaseEnvironm
271271
if(connectionString.username() != null && !connectionString.username().equals("")) {
272272
this.authenticator = new PasswordAuthenticator(connectionString.username(), "");
273273
}
274-
core = new CouchbaseCore(environment);
275-
SeedNodesRequest request = new SeedNodesRequest(
276-
assembleSeedNodes(connectionString, environment)
277-
);
278-
core.send(request).toBlocking().single();
274+
this.core = new CouchbaseCore(environment);
275+
this.seedNodes = assembleSeedNodes(connectionString, environment);
279276
this.environment = environment;
280-
this.connectionString = connectionString;
281-
this.bucketCache = new ConcurrentHashMap<String, AsyncBucket>();
277+
this.bucketCache = new ConcurrentHashMap<>();
278+
279+
core.send(new SeedNodesRequest(seedNodes)).toBlocking().single();
282280
}
283281

284282
/**
@@ -294,7 +292,7 @@ public static CouchbaseAsyncCluster fromConnectionString(final CouchbaseEnvironm
294292
*/
295293
private static List<String> assembleSeedNodes(ConnectionString connectionString,
296294
CouchbaseEnvironment environment) {
297-
List<String> seedNodes = new ArrayList<String>();
295+
List<String> seedNodes = new ArrayList<>();
298296

299297
if (environment.dnsSrvEnabled()) {
300298
seedNodesViaDnsSrv(connectionString, environment, seedNodes);
@@ -481,9 +479,16 @@ public void call(Boolean aBoolean) {
481479
@Override
482480
public Observable<AsyncClusterManager> clusterManager(final String username,
483481
final String password) {
482+
483+
// To not change too much code, just turn the resolved hosts into a new
484+
// connection string and resolve the hosts from there.
485+
final List<ConnectionString.UnresolvedSocket> seedNodes = ConnectionString
486+
.fromHostnames(this.seedNodes)
487+
.hosts();
488+
484489
return Observable.just(
485490
(AsyncClusterManager) DefaultAsyncClusterManager.create(
486-
username, password, connectionString, environment, core
491+
username, password, seedNodes, environment, core
487492
)
488493
);
489494
}

src/main/java/com/couchbase/client/java/CouchbaseCluster.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public static CouchbaseCluster fromConnectionString(final CouchbaseEnvironment e
244244
);
245245
this.environment = environment;
246246
this.connectionString = connectionString;
247-
this.bucketCache = new ConcurrentHashMap<String, Bucket>();
247+
this.bucketCache = new ConcurrentHashMap<>();
248248
}
249249

250250
@Override
@@ -388,8 +388,7 @@ public ClusterManager clusterManager(final String username, final String passwor
388388
.map(new Func1<AsyncClusterManager, ClusterManager>() {
389389
@Override
390390
public ClusterManager call(AsyncClusterManager asyncClusterManager) {
391-
return DefaultClusterManager.create(username, password, connectionString,
392-
environment, core());
391+
return new DefaultClusterManager(asyncClusterManager, username, password, environment, core());
393392
}
394393
})
395394
.toBlocking()
@@ -404,8 +403,7 @@ public ClusterManager clusterManager() {
404403
.map(new Func1<AsyncClusterManager, ClusterManager>() {
405404
@Override
406405
public ClusterManager call(AsyncClusterManager asyncClusterManager) {
407-
return DefaultClusterManager.create(cred.login(), cred.password(), connectionString,
408-
environment, core());
406+
return new DefaultClusterManager(asyncClusterManager, cred.login(), cred.password(), environment, core());
409407
}
410408
})
411409
.toBlocking()

src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,26 @@ public class DefaultAsyncClusterManager implements AsyncClusterManager {
7777
final String username;
7878
final String password;
7979
final CouchbaseEnvironment environment;
80-
private final ConnectionString connectionString;
80+
private final List<ConnectionString.UnresolvedSocket> seedNodes;
8181

82-
DefaultAsyncClusterManager(final String username, final String password, final ConnectionString connectionString,
82+
DefaultAsyncClusterManager(final String username, final String password, final List<ConnectionString.UnresolvedSocket> seedNodes,
8383
final CouchbaseEnvironment environment, final ClusterFacade core) {
8484
this.username = username;
8585
this.password = password;
8686
this.core = core;
8787
this.environment = environment;
88-
this.connectionString = connectionString;
88+
this.seedNodes = seedNodes;
8989
}
9090

91+
@Deprecated
9192
public static DefaultAsyncClusterManager create(final String username, final String password,
9293
final ConnectionString connectionString, final CouchbaseEnvironment environment, final ClusterFacade core) {
93-
return new DefaultAsyncClusterManager(username, password, connectionString, environment, core);
94+
return create(username, password, connectionString.hosts(), environment, core);
95+
}
96+
97+
public static DefaultAsyncClusterManager create(final String username, final String password,
98+
final List<ConnectionString.UnresolvedSocket> seedNodes, final CouchbaseEnvironment environment, final ClusterFacade core) {
99+
return new DefaultAsyncClusterManager(username, password, seedNodes, environment, core);
94100
}
95101

96102
@Override
@@ -695,17 +701,17 @@ public Boolean call(AddServiceResponse addServiceResponse) {
695701
}
696702

697703
private Observable<Boolean> ensureServiceEnabled() {
698-
if (connectionString.hosts().isEmpty()) {
699-
return Observable.error(new IllegalStateException("No host found in the connection string! " + connectionString.toString()));
704+
if (seedNodes.isEmpty()) {
705+
return Observable.error(new IllegalStateException("No host found in the seed nodes!"));
700706
}
701707

702708
final AtomicInteger integer = new AtomicInteger(0);
703-
return Observable.just(connectionString.hosts())
709+
return Observable.just(seedNodes)
704710
.flatMap(new Func1<List<ConnectionString.UnresolvedSocket>, Observable<Boolean>>() {
705711
@Override
706712
public Observable<Boolean> call(List<ConnectionString.UnresolvedSocket> inetSocketAddresses) {
707713
int hostIndex = integer.getAndIncrement();
708-
if (hostIndex >= connectionString.hosts().size()) {
714+
if (hostIndex >= seedNodes.size()) {
709715
integer.set(0);
710716
return Observable.error(new CouchbaseException("Could not enable ClusterManager service to function properly."));
711717
}

src/main/java/com/couchbase/client/java/cluster/DefaultClusterManager.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.couchbase.client.core.ClusterFacade;
1919
import com.couchbase.client.core.annotations.InterfaceStability;
2020
import com.couchbase.client.core.utils.Blocking;
21-
import com.couchbase.client.java.cluster.api.AsyncClusterApiClient;
2221
import com.couchbase.client.core.utils.ConnectionString;
2322
import com.couchbase.client.java.cluster.api.ClusterApiClient;
2423
import com.couchbase.client.java.env.CouchbaseEnvironment;
@@ -29,19 +28,40 @@
2928
public class DefaultClusterManager implements ClusterManager {
3029

3130
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
32-
private final DefaultAsyncClusterManager asyncClusterManager;
31+
private final AsyncClusterManager asyncClusterManager;
32+
private final String username;
33+
private final String password;
34+
private final ClusterFacade core;
3335
private final long timeout;
3436

35-
DefaultClusterManager(final String username, final String password, final ConnectionString connectionString,
37+
public DefaultClusterManager(final AsyncClusterManager asyncClusterManager, final String username, final String password,
3638
final CouchbaseEnvironment environment, final ClusterFacade core) {
37-
asyncClusterManager = DefaultAsyncClusterManager.create(username, password, connectionString, environment,
39+
this.asyncClusterManager = asyncClusterManager;
40+
this.timeout = environment.managementTimeout();
41+
this.username = username;
42+
this.password = password;
43+
this.core = core;
44+
}
45+
46+
DefaultClusterManager(final String username, final String password, final List<ConnectionString.UnresolvedSocket> seedNodes,
47+
final CouchbaseEnvironment environment, final ClusterFacade core) {
48+
asyncClusterManager = DefaultAsyncClusterManager.create(username, password, seedNodes, environment,
3849
core);
3950
this.timeout = environment.managementTimeout();
51+
this.username = username;
52+
this.password = password;
53+
this.core = core;
4054
}
4155

56+
@Deprecated
4257
public static DefaultClusterManager create(final String username, final String password,
4358
final ConnectionString connectionString, final CouchbaseEnvironment environment, final ClusterFacade core) {
44-
return new DefaultClusterManager(username, password, connectionString, environment, core);
59+
return create(username, password, connectionString.hosts(), environment, core);
60+
}
61+
62+
public static DefaultClusterManager create(final String username, final String password,
63+
final List<ConnectionString.UnresolvedSocket> seedNodes, final CouchbaseEnvironment environment, final ClusterFacade core) {
64+
return new DefaultClusterManager(username, password, seedNodes, environment, core);
4565
}
4666

4767
@Override
@@ -162,7 +182,7 @@ public User getUser(AuthDomain domain, String userid, long timeout, TimeUnit tim
162182
@Override
163183
@InterfaceStability.Experimental
164184
public ClusterApiClient apiClient() {
165-
return new ClusterApiClient(asyncClusterManager.username, asyncClusterManager.password, asyncClusterManager.core,
166-
this.timeout, TIMEOUT_UNIT); //uses the management timeout as default for API calls as well
185+
// uses the management timeout as default for API calls as well
186+
return new ClusterApiClient(username, password, core, this.timeout, TIMEOUT_UNIT);
167187
}
168188
}

0 commit comments

Comments
 (0)