Skip to content

Commit 23d3c2e

Browse files
committed
PYTHON-1163 update topology from handshake reply
1 parent 20baefa commit 23d3c2e

File tree

5 files changed

+114
-19
lines changed

5 files changed

+114
-19
lines changed

pymongo/pool.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,15 @@ class PoolOptions(object):
217217
'__connect_timeout', '__socket_timeout',
218218
'__wait_queue_timeout', '__wait_queue_multiple',
219219
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
220-
'__event_listeners', '__appname', '__metadata')
220+
'__event_listeners', '__appname', '__metadata',
221+
'__handshake_callback')
221222

222223
def __init__(self, max_pool_size=100, min_pool_size=0,
223224
max_idle_time_ms=None, connect_timeout=None,
224225
socket_timeout=None, wait_queue_timeout=None,
225226
wait_queue_multiple=None, ssl_context=None,
226227
ssl_match_hostname=True, socket_keepalive=False,
227-
event_listeners=None, appname=None):
228+
event_listeners=None, appname=None, handshake_callback=None):
228229

229230
self.__max_pool_size = max_pool_size
230231
self.__min_pool_size = min_pool_size
@@ -242,6 +243,27 @@ def __init__(self, max_pool_size=100, min_pool_size=0,
242243
if appname:
243244
self.__metadata['application'] = {'name': appname}
244245

246+
self.__handshake_callback = handshake_callback
247+
248+
def with_options(self, **kwargs):
249+
options = {
250+
'max_pool_size': self.max_pool_size,
251+
'min_pool_size': self.min_pool_size,
252+
'max_idle_time_ms': self.max_idle_time_ms,
253+
'connect_timeout': self.connect_timeout,
254+
'socket_timeout': self.socket_timeout,
255+
'wait_queue_timeout': self.wait_queue_timeout,
256+
'wait_queue_multiple': self.wait_queue_multiple,
257+
'ssl_context': self.ssl_context,
258+
'ssl_match_hostname': self.ssl_match_hostname,
259+
'socket_keepalive': self.socket_keepalive,
260+
'event_listeners': self.event_listeners,
261+
'appname': self.appname,
262+
'handshake_callback': self.handshake_callback}
263+
264+
options.update(kwargs)
265+
return PoolOptions(**options)
266+
245267
@property
246268
def max_pool_size(self):
247269
"""The maximum allowable number of concurrent connections to each
@@ -335,6 +357,11 @@ def metadata(self):
335357
"""
336358
return self.__metadata.copy()
337359

360+
@property
361+
def handshake_callback(self):
362+
"""Receives an ismaster reply and updates the topology."""
363+
return self.__handshake_callback
364+
338365

339366
class SocketInfo(object):
340367
"""Store a socket with some metadata.
@@ -746,6 +773,8 @@ def connect(self):
746773
('ismaster', 1),
747774
('client', self.opts.metadata)
748775
])
776+
777+
start = _time()
749778
ismaster = IsMaster(
750779
command(sock,
751780
'admin',
@@ -754,13 +783,20 @@ def connect(self):
754783
False,
755784
ReadPreference.PRIMARY,
756785
DEFAULT_CODEC_OPTIONS))
786+
787+
# Can raise ConnectionFailure.
788+
self._handshake_callback(ismaster, _time() - start)
757789
else:
758790
ismaster = None
759791
return SocketInfo(sock, self, ismaster, self.address)
760792
except socket.error as error:
761793
if sock is not None:
762794
sock.close()
763795
_raise_connection_failure(self.address, error)
796+
except:
797+
if sock is not None:
798+
sock.close()
799+
raise
764800

765801
@contextlib.contextmanager
766802
def get_socket(self, all_credentials, checkout=False):
@@ -889,6 +925,14 @@ def _check(self, sock_info):
889925
else:
890926
return self.connect()
891927

928+
def _handshake_callback(self, ismaster, round_trip_time):
929+
callback = self.opts.handshake_callback
930+
if callback:
931+
kept = callback(self.address, ismaster, round_trip_time)
932+
if not kept:
933+
_raise_connection_failure(
934+
self.address, "server removed from topology")
935+
892936
def _raise_wait_queue_timeout(self):
893937
raise ConnectionFailure(
894938
'Timed out waiting for socket from pool with max_size %r and'

pymongo/server_description.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,25 @@ class ServerDescription(object):
3838
- `ismaster`: Optional IsMaster instance
3939
- `round_trip_time`: Optional float
4040
- `error`: Optional, the last error attempting to connect to the server
41+
- `from_handshake`: Optional, whether this is from expanding a connection
42+
pool, rather than from background monitoring.
4143
"""
4244

4345
__slots__ = (
4446
'_address', '_server_type', '_all_hosts', '_tags', '_replica_set_name',
4547
'_primary', '_max_bson_size', '_max_message_size',
4648
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
4749
'_round_trip_time', '_me', '_is_writable', '_is_readable', '_error',
48-
'_set_version', '_election_id', '_last_write_date', '_last_update_time')
50+
'_set_version', '_election_id', '_last_write_date', '_last_update_time',
51+
'_from_handshake')
4952

5053
def __init__(
5154
self,
5255
address,
5356
ismaster=None,
5457
round_trip_time=None,
55-
error=None):
58+
error=None,
59+
from_handshake=False):
5660
self._address = address
5761
if not ismaster:
5862
ismaster = IsMaster({})
@@ -75,6 +79,7 @@ def __init__(
7579
self._me = ismaster.me
7680
self._last_update_time = _time()
7781
self._error = error
82+
self._from_handshake = from_handshake # For tests.
7883

7984
if ismaster.last_write_date:
8085
# Convert from datetime to seconds.

pymongo/topology.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828

2929
from pymongo import common
3030
from pymongo import periodic_executor
31-
from pymongo.pool import PoolOptions
3231
from pymongo.topology_description import (updated_topology_description,
3332
TOPOLOGY_TYPE,
3433
TopologyDescription)
3534
from pymongo.errors import ServerSelectionTimeoutError
3635
from pymongo.monotonic import time as _time
3736
from pymongo.server import Server
37+
from pymongo.server_description import ServerDescription
3838
from pymongo.server_selectors import (any_server_selector,
3939
arbiter_server_selector,
4040
secondary_server_selector,
@@ -237,7 +237,10 @@ def select_server_by_address(self, address,
237237
address)
238238

239239
def on_change(self, server_description):
240-
"""Process a new ServerDescription after an ismaster call completes."""
240+
"""Process a new ServerDescription after an ismaster call completes.
241+
242+
Returns False if the server was removed from the topology.
243+
"""
241244
# We do no I/O holding the lock.
242245
with self._lock:
243246
# Any monitored server was definitely in the topology description
@@ -266,6 +269,9 @@ def on_change(self, server_description):
266269

267270
# Wake waiters in select_servers().
268271
self._condition.notify_all()
272+
return self._description.has_server(server_description.address)
273+
else:
274+
return False
269275

270276
def get_server_by_address(self, address):
271277
"""Get a Server or None.
@@ -337,9 +343,13 @@ def reset_server_and_request_check(self, address):
337343

338344
def update_pool(self):
339345
# Remove any stale sockets and add new sockets if pool is too small.
346+
# Avoid locking around network I/O, or deadlocking when a new connection
347+
# opens and calls Topology.on_change() with the ismaster reply.
340348
with self._lock:
341-
for server in self._servers.values():
342-
server._pool.remove_stale_sockets()
349+
pools = [server._pool for server in self._servers.values()]
350+
351+
for pool in pools:
352+
pool.remove_stale_sockets()
343353

344354
def close(self):
345355
"""Clear pools and terminate monitors. Topology reopens on demand."""
@@ -448,22 +458,36 @@ def _update_servers(self):
448458
self._servers.pop(address)
449459

450460
def _create_pool_for_server(self, address):
451-
return self._settings.pool_class(address, self._settings.pool_options)
461+
# Server Discovery And Monitoring Spec: When a client calls ismaster
462+
# to handshake a new connection for application operations, use the
463+
# ismaster reply to update the topology.
464+
ref = weakref.proxy(self)
452465

453-
def _create_pool_for_monitor(self, address):
454-
options = self._settings.pool_options
466+
def handshake_callback(address, ismaster, round_trip_time):
467+
sd = ServerDescription(address, ismaster, round_trip_time,
468+
from_handshake=True)
469+
470+
try:
471+
# Return False if server was removed from topology.
472+
return ref.on_change(sd)
473+
except ReferenceError:
474+
return True
475+
476+
server_pool_options = self._settings.pool_options.with_options(
477+
handshake_callback=handshake_callback)
455478

479+
return self._settings.pool_class(address, server_pool_options)
480+
481+
def _create_pool_for_monitor(self, address):
456482
# According to the Server Discovery And Monitoring Spec, monitors use
457483
# connect_timeout for both connect_timeout and socket_timeout. The
458484
# pool only has one socket so maxPoolSize and so on aren't needed.
459-
monitor_pool_options = PoolOptions(
460-
connect_timeout=options.connect_timeout,
461-
socket_timeout=options.connect_timeout,
462-
ssl_context=options.ssl_context,
463-
ssl_match_hostname=options.ssl_match_hostname,
485+
opts = self._settings.pool_options
486+
monitor_pool_options = opts.with_options(
487+
socket_timeout=opts.connect_timeout,
464488
socket_keepalive=True,
465-
event_listeners=options.event_listeners,
466-
appname=options.appname)
489+
max_pool_size=None,
490+
min_pool_size=None)
467491

468492
return self._settings.pool_class(address, monitor_pool_options,
469493
handshake=False)

test/pymongo_mocks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ def get_socket(self, all_credentials, checkout=False):
5858
sock_info.mock_port = self.mock_port
5959
yield sock_info
6060

61+
def _handshake_callback(self, ismaster, round_trip_time):
62+
# Don't mock how PyMongo updates topology from ismaster reply.
63+
return True
64+
6165

6266
class MockMonitor(Monitor):
6367
def __init__(

test/test_topology.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from bson.py3compat import imap
2424
from pymongo import common
25+
from pymongo import monitoring
2526
from pymongo.read_preferences import ReadPreference, Secondary
2627
from pymongo.server_type import SERVER_TYPE
2728
from pymongo.topology import Topology
@@ -37,7 +38,7 @@
3738
writable_server_selector)
3839
from pymongo.settings import TopologySettings
3940
from test import client_knobs, unittest
40-
from test.utils import wait_until
41+
from test.utils import rs_or_single_client, wait_until
4142

4243

4344
class MockSocketInfo(object):
@@ -297,6 +298,23 @@ def new_average():
297298
if tries > 10:
298299
self.fail("Didn't ever calculate correct new average")
299300

301+
def test_update_from_handshake(self):
302+
class ServerHandshakes(monitoring.ServerListener, list):
303+
def opened(self, e):
304+
pass
305+
306+
def description_changed(self, e):
307+
if e.new_description._from_handshake:
308+
self.append(e)
309+
310+
def closed(self, e):
311+
pass
312+
313+
handshakes = ServerHandshakes()
314+
client = rs_or_single_client(event_listeners=[handshakes])
315+
client.admin.command('ping')
316+
wait_until(lambda: handshakes, 'record handshakes')
317+
300318

301319
class TestMultiServerTopology(TopologyTest):
302320
def test_readable_writable(self):

0 commit comments

Comments
 (0)