@@ -848,40 +848,38 @@ def _test_max_pool_size_no_rendezvous(self, start_request, end_request):
848
848
for t in threads :
849
849
self .assertTrue (t .passed )
850
850
851
- # Socket-reclamation doesn't work in Jython
852
- if not sys .platform .startswith ('java' ):
853
- cx_pool = c ._MongoClient__pool
851
+ cx_pool = c ._MongoClient__pool
854
852
855
- # Socket-reclamation depends on timely garbage-collection
856
- if 'PyPy' in sys .version :
857
- gc .collect ()
853
+ # Socket-reclamation depends on timely garbage-collection
854
+ if 'PyPy' in sys .version :
855
+ gc .collect ()
858
856
859
- if self .use_greenlets :
860
- # Wait for Greenlet.link() callbacks to execute
861
- the_hub = hub .get_hub ()
862
- if hasattr (the_hub , 'join' ):
863
- # Gevent 1.0
864
- the_hub .join ()
865
- else :
866
- # Gevent 0.13 and less
867
- the_hub .shutdown ()
857
+ if self .use_greenlets :
858
+ # Wait for Greenlet.link() callbacks to execute
859
+ the_hub = hub .get_hub ()
860
+ if hasattr (the_hub , 'join' ):
861
+ # Gevent 1.0
862
+ the_hub .join ()
863
+ else :
864
+ # Gevent 0.13 and less
865
+ the_hub .shutdown ()
868
866
869
- # thread.join completes slightly *before* thread locals are
870
- # cleaned up, so wait up to 5 seconds for them.
867
+ # thread.join completes slightly *before* thread locals are
868
+ # cleaned up, so wait up to 5 seconds for them.
869
+ time .sleep (0.1 )
870
+ cx_pool ._ident .get ()
871
+ start = time .time ()
872
+
873
+ while (
874
+ not cx_pool .sockets
875
+ and cx_pool ._socket_semaphore .counter < max_pool_size
876
+ and (time .time () - start ) < 5
877
+ ):
871
878
time .sleep (0.1 )
872
879
cx_pool ._ident .get ()
873
- start = time .time ()
874
-
875
- while (
876
- not cx_pool .sockets
877
- and cx_pool ._socket_semaphore .counter < max_pool_size
878
- and (time .time () - start ) < 5
879
- ):
880
- time .sleep (0.1 )
881
- cx_pool ._ident .get ()
882
880
883
- self .assertTrue (len (cx_pool .sockets ) >= 1 )
884
- self .assertEqual (max_pool_size , cx_pool ._socket_semaphore .counter )
881
+ self .assertTrue (len (cx_pool .sockets ) >= 1 )
882
+ self .assertEqual (max_pool_size , cx_pool ._socket_semaphore .counter )
885
883
886
884
def test_max_pool_size (self ):
887
885
self ._test_max_pool_size (
@@ -923,61 +921,61 @@ def test_max_pool_size_with_end_request_only(self):
923
921
self ._test_max_pool_size (0 , 1 )
924
922
925
923
924
+ class SocketGetter (threading .Thread ):
925
+ """Utility for _TestMaxOpenSockets and _TestWaitQueueMultiple"""
926
+ def __init__ (self , pool ):
927
+ super (SocketGetter , self ).__init__ ()
928
+ self .setDaemon (True )
929
+ self .state = 'init'
930
+ self .pool = pool
931
+ self .sock = None
932
+
933
+ def run (self ):
934
+ self .state = 'get_socket'
935
+ self .sock = self .pool .get_socket ()
936
+ self .state = 'sock'
937
+
938
+
926
939
class _TestMaxOpenSockets (_TestPoolingBase ):
927
940
"""Test that connection pool doesn't open more than max_size sockets.
928
941
To be run both with threads and with greenlets.
929
942
"""
930
- def get_pool (self , conn_timeout , net_timeout , wait_queue_timeout ):
931
- return pymongo .pool .Pool (('127.0.0.1' , 27017 ),
932
- 2 , net_timeout , conn_timeout ,
933
- False , False ,
934
- wait_queue_timeout = wait_queue_timeout )
935
-
936
- def test_over_max_times_out (self ):
937
- conn_timeout = 2
938
- pool = self .get_pool (conn_timeout , conn_timeout + 5 , conn_timeout + 1 )
939
- s1 = pool .get_socket ()
940
- s2 = pool .get_socket ()
941
- self .assertNotEqual (s1 , s2 )
943
+ def get_pool_with_wait_queue_timeout (self , wait_queue_timeout ):
944
+ return self .get_pool (('127.0.0.1' , 27017 ),
945
+ 1 , None , None ,
946
+ False ,
947
+ wait_queue_timeout = wait_queue_timeout ,
948
+ wait_queue_multiple = None )
949
+
950
+ def test_wait_queue_timeout (self ):
951
+ wait_queue_timeout = 2 # Seconds
952
+ pool = self .get_pool_with_wait_queue_timeout (wait_queue_timeout )
953
+ pool .get_socket ()
942
954
start = time .time ()
943
- self .assertRaises (socket . timeout , pool .get_socket )
944
- end = time .time ()
955
+ self .assertRaises (ConnectionFailure , pool .get_socket )
956
+ duration = time .time () - start
945
957
self .assertTrue (
946
- end - start > conn_timeout ,
947
- "%.2f seconds duration less than expected %f" % (
948
- end - start , conn_timeout ))
949
-
950
- self .assertTrue (end - start < conn_timeout + 5 )
951
-
952
- def test_over_max_no_timeout_blocks (self ):
953
- class Thread (threading .Thread ):
954
- def __init__ (self , pool ):
955
- super (Thread , self ).__init__ ()
956
- self .state = 'init'
957
- self .pool = pool
958
- self .sock = None
959
-
960
- def run (self ):
961
- self .state = 'get_socket'
962
- self .sock = self .pool .get_socket ()
963
- self .state = 'sock'
964
-
965
- pool = self .get_pool (None , 2 , None )
958
+ abs (wait_queue_timeout - duration ) < 1 ,
959
+ "Waited %.2f seconds for a socket, expected %f" % (
960
+ duration , wait_queue_timeout ))
961
+
962
+ def test_blocking (self ):
963
+ # Verify get_socket() with no wait_queue_timeout blocks forever.
964
+ pool = self .get_pool_with_wait_queue_timeout (None )
965
+
966
+ # Reach max_size.
966
967
s1 = pool .get_socket ()
967
- self .assertTrue (None is not s1 )
968
- s2 = pool .get_socket ()
969
- self .assertTrue (None is not s2 )
970
- self .assertNotEqual (s1 , s2 )
971
- t = Thread (pool )
968
+ t = SocketGetter (pool )
972
969
t .start ()
973
970
while t .state != 'get_socket' :
974
971
time .sleep (0.1 )
975
- self . assertEqual ( t . state , 'get_socket' )
976
- time .sleep (5 )
972
+
973
+ time .sleep (1 )
977
974
self .assertEqual (t .state , 'get_socket' )
978
975
pool .maybe_return_socket (s1 )
979
976
while t .state != 'sock' :
980
977
time .sleep (0.1 )
978
+
981
979
self .assertEqual (t .state , 'sock' )
982
980
self .assertEqual (t .sock , s1 )
983
981
@@ -987,88 +985,47 @@ class _TestWaitQueueMultiple(_TestPoolingBase):
987
985
waitQueueMultiple * max_size waiters.
988
986
To be run both with threads and with greenlets.
989
987
"""
990
- def get_pool (self , conn_timeout , net_timeout , wait_queue_timeout ,
991
- wait_queue_multiple ):
992
- return pymongo .pool .Pool (('127.0.0.1' , 27017 ),
993
- 2 , net_timeout , conn_timeout ,
994
- False , False ,
995
- wait_queue_timeout = wait_queue_timeout ,
996
- wait_queue_multiple = wait_queue_multiple )
988
+ def get_pool_with_wait_queue_multiple (self , wait_queue_multiple ):
989
+ return self .get_pool (('127.0.0.1' , 27017 ),
990
+ 2 , None , None ,
991
+ False ,
992
+ wait_queue_timeout = None ,
993
+ wait_queue_multiple = wait_queue_multiple )
997
994
998
995
def test_wait_queue_multiple (self ):
999
- class Thread (threading .Thread ):
1000
- def __init__ (self , pool ):
1001
- super (Thread , self ).__init__ ()
1002
- self .state = 'init'
1003
- self .pool = pool
1004
- self .sock = None
1005
-
1006
- def run (self ):
1007
- self .state = 'get_socket'
1008
- self .sock = self .pool .get_socket ()
1009
- self .state = 'sock'
1010
-
1011
- pool = self .get_pool (None , None , None , 3 )
1012
- socks = []
1013
- for _ in xrange (2 ):
1014
- sock = pool .get_socket ()
1015
- self .assertTrue (sock is not None )
1016
- socks .append (sock )
996
+ pool = self .get_pool_with_wait_queue_multiple (3 )
997
+
998
+ # Reach max_size sockets.
999
+ pool .get_socket ()
1000
+ pool .get_socket ()
1001
+
1002
+ # Reach max_size * wait_queue_multiple waiters.
1017
1003
threads = []
1018
1004
for _ in xrange (6 ):
1019
- t = Thread (pool )
1005
+ t = SocketGetter (pool )
1020
1006
t .start ()
1021
1007
threads .append (t )
1008
+
1022
1009
time .sleep (1 )
1023
1010
for t in threads :
1024
1011
self .assertEqual (t .state , 'get_socket' )
1012
+
1025
1013
self .assertRaises (ExceededMaxWaiters , pool .get_socket )
1026
- while threads :
1027
- for sock in socks :
1028
- pool .maybe_return_socket (sock )
1029
- socks = []
1030
- for t in list (threads ):
1031
- if t .sock is not None :
1032
- socks .append (t .sock )
1033
- t .join ()
1034
- threads .remove (t )
1035
1014
1036
1015
def test_wait_queue_multiple_unset (self ):
1037
- class Thread (threading .Thread ):
1038
- def __init__ (self , pool ):
1039
- super (Thread , self ).__init__ ()
1040
- self .state = 'init'
1041
- self .pool = pool
1042
- self .sock = None
1043
-
1044
- def run (self ):
1045
- self .state = 'get_socket'
1046
- self .sock = self .pool .get_socket ()
1047
- self .state = 'sock'
1048
-
1049
- pool = self .get_pool (None , None , None , None )
1016
+ pool = self .get_pool_with_wait_queue_multiple (None )
1050
1017
socks = []
1051
1018
for _ in xrange (2 ):
1052
1019
sock = pool .get_socket ()
1053
- self .assertTrue (sock is not None )
1054
1020
socks .append (sock )
1055
1021
threads = []
1056
1022
for _ in xrange (30 ):
1057
- t = Thread (pool )
1023
+ t = SocketGetter (pool )
1058
1024
t .start ()
1059
1025
threads .append (t )
1060
1026
time .sleep (1 )
1061
1027
for t in threads :
1062
1028
self .assertEqual (t .state , 'get_socket' )
1063
- while threads :
1064
- for sock in socks :
1065
- pool .maybe_return_socket (sock )
1066
- socks = []
1067
- for t in list (threads ):
1068
- if t .sock is not None :
1069
- socks .append (t .sock )
1070
- t .join ()
1071
- threads .remove (t )
1072
1029
1073
1030
1074
1031
class _TestPoolSocketSharing (_TestPoolingBase ):
0 commit comments