Skip to content

Commit 066ce86

Browse files
Introduce a way for AMQConnection to notify its recovery aware wrapper
Fixes rabbitmq#135.
1 parent 593a634 commit 066ce86

File tree

3 files changed

+56
-21
lines changed

3 files changed

+56
-21
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import java.net.InetAddress;
2222
import java.net.SocketException;
2323
import java.net.SocketTimeoutException;
24-
import java.util.Collection;
25-
import java.util.Collections;
26-
import java.util.HashMap;
27-
import java.util.Map;
24+
import java.util.*;
2825
import java.util.concurrent.*;
2926

3027
import com.rabbitmq.client.AMQP;
@@ -45,6 +42,7 @@
4542
import com.rabbitmq.client.SaslMechanism;
4643
import com.rabbitmq.client.ShutdownSignalException;
4744
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
45+
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
4846
import com.rabbitmq.utility.BlockingCell;
4947

5048
final class Copyright {
@@ -65,6 +63,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6563
private Thread mainLoopThread;
6664
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
6765

66+
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
67+
new ArrayList<RecoveryCanBeginListener>();
68+
6869
/**
6970
* Retrieve a copy of the default table of client properties that
7071
* will be sent to the server during connection startup. This
@@ -576,10 +577,31 @@ public void run() {
576577
_frameHandler.close();
577578
_appContinuation.set(null);
578579
notifyListeners();
580+
// assuming that shutdown listeners do not do anything
581+
// asynchronously, e.g. start new threads, this effectively
582+
// guarantees that we only begin recovery when all shutdown
583+
// listeners have executed
584+
notifyRecoveryCanBeginListeners();
579585
}
580586
}
581587
}
582588

589+
private void notifyRecoveryCanBeginListeners() {
590+
ShutdownSignalException sse = this.getCloseReason();
591+
for(RecoveryCanBeginListener fn : this.recoveryCanBeginListeners) {
592+
fn.recoveryCanBegin(sse);
593+
}
594+
}
595+
596+
public void addRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
597+
this.recoveryCanBeginListeners.add(fn);
598+
}
599+
600+
@SuppressWarnings(value = "unused")
601+
public void removeRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
602+
this.recoveryCanBeginListeners.remove(fn);
603+
}
604+
583605
/**
584606
* Called when a frame-read operation times out
585607
* @throws MissedHeartbeatException if heart-beats have been missed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,10 @@ public int getLocalPort() {
376376

377377
private void addAutomaticRecoveryListener() {
378378
final AutorecoveringConnection c = this;
379-
ShutdownListener automaticRecoveryListener = new ShutdownListener() {
380-
public void shutdownCompleted(ShutdownSignalException cause) {
379+
// this listener will run after shutdown listeners,
380+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
381+
RecoveryCanBeginListener starter = new RecoveryCanBeginListener() {
382+
public void recoveryCanBegin(ShutdownSignalException cause) {
381383
try {
382384
if (shouldTriggerConnectionRecovery(cause)) {
383385
c.beginAutomaticRecovery();
@@ -388,10 +390,7 @@ public void shutdownCompleted(ShutdownSignalException cause) {
388390
}
389391
};
390392
synchronized (this) {
391-
if(!this.shutdownHooks.contains(automaticRecoveryListener)) {
392-
this.shutdownHooks.add(automaticRecoveryListener);
393-
}
394-
this.delegate.addShutdownListener(automaticRecoveryListener);
393+
this.delegate.addRecoveryCanBeginListener(starter);
395394
}
396395
}
397396

@@ -441,18 +440,20 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
441440

442441
synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
443442
Thread.sleep(this.params.getNetworkRecoveryInterval());
444-
if (!this.recoverConnection())
445-
return;
446-
447-
this.recoverShutdownListeners();
448-
this.recoverBlockedListeners();
449-
this.recoverChannels();
450-
if(this.params.isTopologyRecoveryEnabled()) {
451-
this.recoverEntities();
452-
this.recoverConsumers();
453-
}
443+
if (!this.recoverConnection()) {
444+
return;
445+
}
454446

455-
this.notifyRecoveryListeners();
447+
this.addAutomaticRecoveryListener();
448+
this.recoverShutdownListeners();
449+
this.recoverBlockedListeners();
450+
this.recoverChannels();
451+
if(this.params.isTopologyRecoveryEnabled()) {
452+
this.recoverEntities();
453+
this.recoverConsumers();
454+
}
455+
456+
this.notifyRecoveryListeners();
456457
}
457458

458459
private void recoverShutdownListeners() {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
import com.rabbitmq.client.ShutdownSignalException;
4+
5+
/**
6+
* Used internally to indicate when connection recovery can
7+
* begin. See {@link https://github.com/rabbitmq/rabbitmq-java-client/issues/135}.
8+
* This is package-local by design.
9+
*/
10+
public interface RecoveryCanBeginListener {
11+
void recoveryCanBegin(ShutdownSignalException cause);
12+
}

0 commit comments

Comments
 (0)