Skip to content

Commit 0779443

Browse files
committed
PYTHON-1428 Respect maxPoolSize when minPoolSize is set
1 parent cacc0ab commit 0779443

File tree

2 files changed

+39
-6
lines changed

2 files changed

+39
-6
lines changed

pymongo/pool.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ def reset(self):
815815
sock_info.close()
816816

817817
def remove_stale_sockets(self):
818+
"""Removes stale sockets then adds new ones if pool is too small."""
818819
if self.opts.max_idle_time_ms is not None:
819820
with self.lock:
820821
for sock_info in self.sockets.copy():
@@ -823,11 +824,22 @@ def remove_stale_sockets(self):
823824
self.sockets.remove(sock_info)
824825
sock_info.close()
825826

826-
while len(
827-
self.sockets) + self.active_sockets < self.opts.min_pool_size:
828-
sock_info = self.connect()
827+
while True:
829828
with self.lock:
830-
self.sockets.add(sock_info)
829+
if (len(self.sockets) + self.active_sockets >=
830+
self.opts.min_pool_size):
831+
# There are enough sockets in the pool.
832+
break
833+
834+
# We must acquire the semaphore to respect max_pool_size.
835+
if not self._socket_semaphore.acquire(False):
836+
break
837+
try:
838+
sock_info = self.connect()
839+
with self.lock:
840+
self.sockets.add(sock_info)
841+
finally:
842+
self._socket_semaphore.release()
831843

832844
def connect(self):
833845
"""Connect to Mongo and return a new SocketInfo.

test/test_client.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,18 +283,38 @@ def test_max_idle_time_reaper(self):
283283
pass
284284
self.assertEqual(1, len(server._pool.sockets))
285285
self.assertTrue(sock_info in server._pool.sockets)
286+
client.close()
286287

287288
# Assert reaper removes idle socket and replaces it with a new one
288289
client = rs_or_single_client(maxIdleTimeMS=500,
289290
minPoolSize=1)
290291
server = client._get_topology().select_server(any_server_selector)
291292
with server._pool.get_socket({}) as sock_info:
292293
pass
294+
# When the reaper runs at the same time as the get_socket, two
295+
# sockets could be created and checked into the pool.
296+
self.assertGreaterEqual(len(server._pool.sockets), 1)
297+
wait_until(lambda: sock_info not in server._pool.sockets,
298+
"remove stale socket")
299+
wait_until(lambda: 1 <= len(server._pool.sockets),
300+
"replace stale socket")
301+
client.close()
302+
303+
# Assert reaper respects maxPoolSize when adding new sockets.
304+
client = rs_or_single_client(maxIdleTimeMS=500,
305+
minPoolSize=1,
306+
maxPoolSize=1)
307+
server = client._get_topology().select_server(any_server_selector)
308+
with server._pool.get_socket({}) as sock_info:
309+
pass
310+
# When the reaper runs at the same time as the get_socket,
311+
# maxPoolSize=1 should prevent two sockets from being created.
293312
self.assertEqual(1, len(server._pool.sockets))
294313
wait_until(lambda: sock_info not in server._pool.sockets,
295-
"reaper removes stale socket eventually")
314+
"remove stale socket")
296315
wait_until(lambda: 1 == len(server._pool.sockets),
297-
"reaper replaces stale socket with new one")
316+
"replace stale socket")
317+
client.close()
298318

299319
# Assert reaper has removed idle socket and NOT replaced it
300320
client = rs_or_single_client(maxIdleTimeMS=500)
@@ -304,6 +324,7 @@ def test_max_idle_time_reaper(self):
304324
wait_until(
305325
lambda: 0 == len(server._pool.sockets),
306326
"stale socket reaped and new one NOT added to the pool")
327+
client.close()
307328

308329
def test_min_pool_size(self):
309330
with client_knobs(kill_cursor_frequency=.1):

0 commit comments

Comments
 (0)