@@ -1224,6 +1224,22 @@ def __socket(self, member, force=False):
1224
1224
raise
1225
1225
return sock_info
1226
1226
1227
+ def _ensure_connected (self , sync = False ):
1228
+ """Ensure this client instance is connected to a primary.
1229
+ """
1230
+ # This may be the first time we're connecting to the set.
1231
+ if self .__monitor and not self .__monitor .started :
1232
+ try :
1233
+ self .__monitor .start ()
1234
+ # Minor race condition. It's possible that two (or more)
1235
+ # threads could call monitor.start() consecutively. Just pass.
1236
+ except RunTimeError :
1237
+ pass
1238
+ if sync :
1239
+ rs_state = self .__rs_state
1240
+ if not rs_state .primary_member :
1241
+ self .__schedule_refresh (sync )
1242
+
1227
1243
def disconnect (self ):
1228
1244
"""Disconnect from the replica set primary, unpin all members, and
1229
1245
refresh our view of the replica set.
@@ -1390,9 +1406,7 @@ def _send_message(self, msg,
1390
1406
- `with_last_error`: check getLastError status after sending the
1391
1407
message
1392
1408
"""
1393
- # This may be the first time we're connecting to the set.
1394
- if self .__monitor and not self .__monitor .started :
1395
- self .__monitor .start ()
1409
+ self ._ensure_connected ()
1396
1410
1397
1411
if _connection_to_use in (None , - 1 ):
1398
1412
member = self .__find_primary ()
@@ -1494,9 +1508,7 @@ def _send_message_with_response(self, msg, _connection_to_use=None,
1494
1508
used by Cursor for getMore and killCursors messages.
1495
1509
- `_must_use_master`: If True, send to primary.
1496
1510
"""
1497
- # This may be the first time we're connecting to the set.
1498
- if self .__monitor and not self .__monitor .started :
1499
- self .__monitor .start ()
1511
+ self ._ensure_connected ()
1500
1512
1501
1513
rs_state = self .__rs_state
1502
1514
tag_sets = kwargs .get ('tag_sets' , [{}])
0 commit comments