11/*
2- * Copyright 2011-2013 the original author or authors.
2+ * Copyright 2011-2014 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1515 */
1616package org .springframework .data .redis .listener ;
1717
18- import java .util .ArrayList ;
19- import java .util .Collection ;
20- import java .util .Collections ;
21- import java .util .List ;
22- import java .util .Map ;
23- import java .util .Set ;
18+ import java .util .*;
2419import java .util .concurrent .ConcurrentHashMap ;
2520import java .util .concurrent .CopyOnWriteArraySet ;
2621import java .util .concurrent .Executor ;
3530import org .springframework .core .task .SimpleAsyncTaskExecutor ;
3631import org .springframework .core .task .TaskExecutor ;
3732import org .springframework .data .redis .RedisConnectionFailureException ;
38- import org .springframework .data .redis .connection .Message ;
39- import org .springframework .data .redis .connection .MessageListener ;
40- import org .springframework .data .redis .connection .RedisConnection ;
41- import org .springframework .data .redis .connection .RedisConnectionFactory ;
42- import org .springframework .data .redis .connection .Subscription ;
33+ import org .springframework .data .redis .connection .*;
4334import org .springframework .data .redis .connection .util .ByteArrayWrapper ;
4435import org .springframework .data .redis .serializer .RedisSerializer ;
4536import org .springframework .data .redis .serializer .StringRedisSerializer ;
6859 * @author Costin Leau
6960 * @author Jennifer Hickey
7061 * @author Way Joke
62+ * @author Thomas Darimont
7163 */
7264public class RedisMessageListenerContainer implements InitializingBean , DisposableBean , BeanNameAware , SmartLifecycle {
7365
@@ -86,6 +78,11 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
8678 */
8779public static final long DEFAULT_RECOVERY_INTERVAL = 5000 ;
8880
81+ /**
82+ * The default subscription wait time: 2000 ms = 2 seconds.
83+ */
84+ public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000L ;
85+
8986private long initWait = TimeUnit .SECONDS .toMillis (5 );
9087
9188private Executor subscriptionExecutor ;
@@ -127,6 +124,8 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
127124
128125private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL ;
129126
127+ private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME ;
128+
130129public void afterPropertiesSet () {
131130if (taskExecutor == null ) {
132131manageExecutor = true ;
@@ -746,24 +745,26 @@ public void run() {
746745throw new IllegalStateException ("Retrieved connection is already subscribed; aborting listening" );
747746}
748747
749- // NB: some drivers' Xsubscribe calls block
750- synchronized (monitor ) {
751- monitor .notify ();
752- }
748+ boolean asyncConnection = ConnectionUtils .isAsync (connectionFactory );
753749
754- // subscribe one way or the other
755- // and schedule the rest
756- if (!channelMapping .isEmpty ()) {
757- // schedule the rest of the subscription
758- if (!patternMapping .isEmpty ()) {
759- subscriptionExecutor .execute (new PatternSubscriptionTask ());
750+ // NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
751+ if (!asyncConnection ){
752+ synchronized (monitor ) {
753+ monitor .notify ();
760754}
761- connection .subscribe (new DispatchMessageListener (), unwrap (channelMapping .keySet ()));
762755}
763- else {
764- connection .pSubscribe (new DispatchMessageListener (), unwrap (patternMapping .keySet ()));
756+
757+ SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription ();
758+
759+
760+ if (asyncConnection ){
761+ SpinBarrier .waitFor (subscriptionPresent , getMaxSubscriptionRegistrationWaitingTime ());
762+
763+ synchronized (monitor ){
764+ monitor .notify ();
765+ }
765766}
766- } catch (Throwable t ) {
767+ } catch (Throwable t ) {
767768handleSubscriptionException (t );
768769} finally {
769770// this block is executed once the subscription thread has ended, this may or may not mean
@@ -775,6 +776,63 @@ public void run() {
775776}
776777}
777778
779+ /**
780+ * Performs a potentially asynchronous registration of a subscription.
781+ *
782+ * @return #SubscriptionPresentCondition that can serve as a handle to check whether the subscription is ready.
783+ */
784+ private SubscriptionPresentCondition eventuallyPerformSubscription () {
785+
786+ SubscriptionPresentCondition condition = null ;
787+
788+ if (channelMapping .isEmpty ()) {
789+
790+ condition = new PatternSubscriptionPresentCondition ();
791+ connection .pSubscribe (new DispatchMessageListener (), unwrap (patternMapping .keySet ()));
792+ } else {
793+
794+ if (patternMapping .isEmpty ()) {
795+ condition = new SubscriptionPresentCondition ();
796+ } else {
797+ // schedule the rest of the subscription
798+ subscriptionExecutor .execute (new PatternSubscriptionTask ());
799+ condition = new PatternSubscriptionPresentCondition ();
800+ }
801+
802+ connection .subscribe (new DispatchMessageListener (), unwrap (channelMapping .keySet ()));
803+ }
804+
805+ return condition ;
806+ }
807+
808+ /**
809+ *
810+ * Checks whether the current connection has an associated subscription.
811+ *
812+ * @author Thomas Darimont
813+ */
814+ private class SubscriptionPresentCondition implements Condition {
815+
816+ public boolean passes () {
817+ return connection .isSubscribed ();
818+ }
819+ }
820+
821+ /**
822+ * Checks whether the current connection has an associated pattern subscription.
823+ *
824+ * @author Thomas Darimont
825+ *
826+ * @see org.springframework.data.redis.listener.RedisMessageListenerContainer.SubscriptionTask.SubscriptionPresentTestCondition
827+ */
828+ private class PatternSubscriptionPresentCondition extends SubscriptionPresentCondition {
829+
830+ @ Override
831+ public boolean passes () {
832+ return super .passes () && !CollectionUtils .isEmpty (connection .getSubscription ().getPatterns ());
833+ }
834+ }
835+
778836private byte [][] unwrap (Collection <ByteArrayWrapper > holders ) {
779837if (CollectionUtils .isEmpty (holders )) {
780838return new byte [0 ][];
@@ -934,4 +992,79 @@ public void run() {
934992public void setRecoveryInterval (long recoveryInterval ) {
935993this .recoveryInterval = recoveryInterval ;
936994}
995+
996+ public long getMaxSubscriptionRegistrationWaitingTime () {
997+ return maxSubscriptionRegistrationWaitingTime ;
998+ }
999+
1000+ /**
1001+ * Specify the max time to wait for subscription registrations, in <b>milliseconds</b>.
1002+ * The default is 2000ms, that is, 2 second.
1003+ *
1004+ * @param maxSubscriptionRegistrationWaitingTime
1005+ *
1006+ * @see #DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
1007+ */
1008+ public void setMaxSubscriptionRegistrationWaitingTime (long maxSubscriptionRegistrationWaitingTime ) {
1009+ this .maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime ;
1010+ }
1011+
1012+ /**
1013+ * @author Jennifer Hickey
1014+ * @author Thomas Darimont
1015+ *
1016+ * Note: Placed here to avoid API exposure.
1017+ */
1018+ private static abstract class SpinBarrier {
1019+
1020+ /**
1021+ * Periodically tests, in 100ms intervals, for a condition until it is met or a timeout occurs.
1022+ *
1023+ * @param condition
1024+ * The condition to periodically test
1025+ * @param timeout
1026+ * The timeout
1027+ * @return true if condition passes, false if condition does not pass within
1028+ * timeout
1029+ */
1030+ static boolean waitFor (Condition condition , long timeout ) {
1031+
1032+ long startTime =System .currentTimeMillis ();
1033+
1034+ while (!timedOut (startTime , timeout )) {
1035+ if (condition .passes ()) {
1036+ return true ;
1037+ }
1038+ try {
1039+ Thread .sleep (100 );
1040+ } catch (InterruptedException e ) {
1041+ Thread .currentThread ().interrupt ();
1042+ }
1043+ }
1044+ return false ;
1045+ }
1046+
1047+ private static boolean timedOut (long startTime , long timeout ) {
1048+ return (startTime + timeout ) < System .currentTimeMillis ();
1049+ }
1050+ }
1051+
1052+ /**
1053+ *
1054+ * A condition to test periodically, used in conjunction with
1055+ * {@link org.springframework.data.redis.listener.RedisMessageListenerContainer.SpinBarrier}
1056+ *
1057+ * @author Jennifer Hickey
1058+ * @author Thomas Darimont
1059+ *
1060+ * Note: Placed here to avoid API exposure.
1061+ */
1062+ private static interface Condition {
1063+
1064+ /**
1065+ *
1066+ * @return true if condition passes
1067+ */
1068+ boolean passes ();
1069+ }
9371070}
0 commit comments