54
54
@ InterfaceAudience .Public
55
55
public class NodeLocatorHelper {
56
56
57
- private final ConfigurationProvider configProvider ;
58
57
private final AtomicReference <BucketConfig > bucketConfig ;
59
58
60
59
private NodeLocatorHelper (final Bucket bucket ) {
61
- configProvider = bucket
62
- .core ()
63
- .<GetConfigProviderResponse >send (new GetConfigProviderRequest ())
64
- .toBlocking ()
65
- .single ()
66
- .provider ();
60
+ ConfigurationProvider configProvider = bucket
61
+ .core ()
62
+ .<GetConfigProviderResponse >send (new GetConfigProviderRequest ())
63
+ .toBlocking ()
64
+ .single ()
65
+ .provider ();
67
66
68
- bucketConfig = new AtomicReference <BucketConfig >(configProvider .config ().bucketConfig (bucket .name ()));
67
+ bucketConfig = new AtomicReference <>(configProvider .config ().bucketConfig (bucket .name ()));
69
68
70
69
configProvider
71
70
.configs ()
@@ -121,7 +120,7 @@ public List<InetAddress> replicaNodesForId(final String id) {
121
120
122
121
if (config instanceof CouchbaseBucketConfig ) {
123
122
CouchbaseBucketConfig cbc = (CouchbaseBucketConfig ) config ;
124
- List <InetAddress > replicas = new ArrayList <InetAddress >();
123
+ List <InetAddress > replicas = new ArrayList <>();
125
124
for (int i = 1 ; i <= cbc .numberOfReplicas (); i ++) {
126
125
replicas .add (replicaNodeForId (id , i ));
127
126
}
@@ -131,6 +130,30 @@ public List<InetAddress> replicaNodesForId(final String id) {
131
130
}
132
131
}
133
132
133
+ /**
134
+ * Returns all target replica nodes {@link InetAddress} which are currently available on the bucket.
135
+ *
136
+ * @param id the document ID to check.
137
+ * @return the list of nodes for the given document ID.
138
+ */
139
+ public List <InetAddress > availableReplicaNodesForId (final String id ) {
140
+ BucketConfig config = bucketConfig .get ();
141
+
142
+ if (config instanceof CouchbaseBucketConfig ) {
143
+ CouchbaseBucketConfig cbc = (CouchbaseBucketConfig ) config ;
144
+ List <InetAddress > replicas = new ArrayList <>();
145
+ for (int i = 1 ; i <= cbc .numberOfReplicas (); i ++) {
146
+ InetAddress foundReplica = replicaNodeForId (id , i , false );
147
+ if (foundReplica != null ) {
148
+ replicas .add (foundReplica );
149
+ }
150
+ }
151
+ return replicas ;
152
+ } else {
153
+ throw new UnsupportedOperationException ("Bucket type not supported: " + config .getClass ().getName ());
154
+ }
155
+ }
156
+
134
157
/**
135
158
* Returns the target replica node {@link InetAddress} for a given document ID and replica number on the bucket.
136
159
*
@@ -139,6 +162,19 @@ public List<InetAddress> replicaNodesForId(final String id) {
139
162
* @return the node for the given document id.
140
163
*/
141
164
public InetAddress replicaNodeForId (final String id , int replicaNum ) {
165
+ return replicaNodeForId (id , replicaNum , true );
166
+ }
167
+
168
+
169
+ /**
170
+ * Returns the target replica node {@link InetAddress} for a given document ID and replica number on the bucket.
171
+ *
172
+ * @param id the document id to convert.
173
+ * @param replicaNum the replica number.
174
+ * @param throwOnNotAvailable if on -1 and -2 an exception should be thrown.
175
+ * @return the node for the given document id.
176
+ */
177
+ private InetAddress replicaNodeForId (final String id , int replicaNum , boolean throwOnNotAvailable ) {
142
178
if (replicaNum < 1 || replicaNum > 3 ) {
143
179
throw new IllegalArgumentException ("Replica number must be between 1 and 3." );
144
180
}
@@ -150,10 +186,18 @@ public InetAddress replicaNodeForId(final String id, int replicaNum) {
150
186
int partitionId = (int ) hashId (id ) & cbc .numberOfPartitions () - 1 ;
151
187
int nodeId = cbc .nodeIndexForReplica (partitionId , replicaNum - 1 , false );
152
188
if (nodeId == -1 ) {
153
- throw new IllegalStateException ("No partition assigned to node for Document ID: " + id );
189
+ if (throwOnNotAvailable ) {
190
+ throw new IllegalStateException ("No partition assigned to node for Document ID: " + id );
191
+ } else {
192
+ return null ;
193
+ }
154
194
}
155
195
if (nodeId == -2 ) {
156
- throw new IllegalStateException ("Replica not configured for this bucket." );
196
+ if (throwOnNotAvailable ) {
197
+ throw new IllegalStateException ("Replica not configured for this bucket." );
198
+ } else {
199
+ return null ;
200
+ }
157
201
}
158
202
try {
159
203
return InetAddress .getByName (cbc .nodeAtIndex (nodeId ).hostname ());
@@ -171,7 +215,7 @@ public InetAddress replicaNodeForId(final String id, int replicaNum) {
171
215
* @return all currently known nodes.
172
216
*/
173
217
public List <InetAddress > nodes () {
174
- List <InetAddress > allNodes = new ArrayList <InetAddress >();
218
+ List <InetAddress > allNodes = new ArrayList <>();
175
219
BucketConfig config = bucketConfig .get ();
176
220
for (NodeInfo nodeInfo : config .nodes ()) {
177
221
try {
0 commit comments