Skip to content

Commit 9ad421a

Browse files
committed
PYTHON-736 Fix exhaust cursor error-handling.
Connection-pool semaphore leak on server error when creating or iterating an exhaust cursor.
1 parent 46a7df0 commit 9ad421a

File tree

6 files changed

+186
-18
lines changed

6 files changed

+186
-18
lines changed

pymongo/cursor.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ def close(self):
5656
self.pool.maybe_return_socket(self.sock)
5757
self.sock, self.pool = None, None
5858

59+
def error(self):
60+
"""Clean up after an error on the managed socket.
61+
"""
62+
if self.sock:
63+
self.sock.close()
64+
65+
# Return the closed socket to avoid a semaphore leak in the pool.
66+
self.close()
67+
5968

6069
# TODO might be cool to be able to do find().include("foo") or
6170
# find().exclude(["bar", "baz"]) or find().slice("a", 1, 2) as an
@@ -914,8 +923,14 @@ def __send_message(self, message):
914923
# due to a socket timeout.
915924
self.__killed = True
916925
raise
917-
else: # exhaust cursor - no getMore message
918-
response = client._exhaust_next(self.__exhaust_mgr.sock)
926+
else:
927+
# Exhaust cursor - no getMore message.
928+
try:
929+
response = client._exhaust_next(self.__exhaust_mgr.sock)
930+
except:
931+
self.__killed = True
932+
self.__exhaust_mgr.error()
933+
raise
919934

920935
try:
921936
response = helpers._unpack_response(response, self.__id,
@@ -938,6 +953,7 @@ def __send_message(self, message):
938953
self.__killed = True
939954
client.disconnect()
940955
raise
956+
941957
self.__id = response["cursor_id"]
942958

943959
# starting from doesn't get set on getmore's for tailable cursors

pymongo/mongo_client.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,27 +1192,35 @@ def _send_message_with_response(self, message,
11921192
sock_info = self.__socket(member)
11931193
exhaust = kwargs.get('exhaust')
11941194
try:
1195-
try:
1196-
if not exhaust and "network_timeout" in kwargs:
1197-
sock_info.sock.settimeout(kwargs["network_timeout"])
1198-
response = self.__send_and_receive(message, sock_info)
1195+
if not exhaust and "network_timeout" in kwargs:
1196+
sock_info.sock.settimeout(kwargs["network_timeout"])
11991197

1200-
if not exhaust:
1201-
if "network_timeout" in kwargs:
1202-
sock_info.sock.settimeout(self.__net_timeout)
1198+
response = self.__send_and_receive(message, sock_info)
12031199

1204-
return (None, (response, sock_info, member.pool))
1205-
except (ConnectionFailure, socket.error), e:
1206-
self.disconnect()
1207-
raise AutoReconnect(str(e))
1208-
finally:
12091200
if not exhaust:
1201+
if "network_timeout" in kwargs:
1202+
sock_info.sock.settimeout(self.__net_timeout)
1203+
12101204
member.pool.maybe_return_socket(sock_info)
12111205

1206+
return (None, (response, sock_info, member.pool))
1207+
except (ConnectionFailure, socket.error), e:
1208+
self.disconnect()
1209+
member.pool.maybe_return_socket(sock_info)
1210+
raise AutoReconnect(str(e))
1211+
except:
1212+
member.pool.maybe_return_socket(sock_info)
1213+
raise
1214+
12121215
def _exhaust_next(self, sock_info):
12131216
"""Used with exhaust cursors to get the next batch off the socket.
1217+
1218+
Can raise AutoReconnect.
12141219
"""
1215-
return self.__receive_message_on_socket(1, None, sock_info)
1220+
try:
1221+
return self.__receive_message_on_socket(1, None, sock_info)
1222+
except socket.error, e:
1223+
raise AutoReconnect(str(e))
12161224

12171225
def start_request(self):
12181226
"""Ensure the current thread or greenlet always uses the same socket

pymongo/mongo_replica_set_client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1711,8 +1711,13 @@ def _send_message_with_response(self, msg, _connection_to_use=None,
17111711

17121712
def _exhaust_next(self, sock_info):
17131713
"""Used with exhaust cursors to get the next batch off the socket.
1714+
1715+
Can raise AutoReconnect.
17141716
"""
1715-
return self.__recv_msg(1, None, sock_info)
1717+
try:
1718+
return self.__recv_msg(1, None, sock_info)
1719+
except socket.error, e:
1720+
raise AutoReconnect(str(e))
17161721

17171722
def start_request(self):
17181723
"""Ensure the current thread or greenlet always uses the same socket

test/test_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
server_started_with_auth,
5353
TestRequestMixin,
5454
_TestLazyConnectMixin,
55+
_TestExhaustCursorMixin,
5556
lazy_client_trial,
5657
NTHREADS,
5758
get_pool,
@@ -1147,5 +1148,10 @@ def test_reconnect(self):
11471148
c.db.collection.find_one()
11481149

11491150

1151+
class TestExhaustCursor(_TestExhaustCursorMixin, unittest.TestCase):
1152+
def _get_client(self, **kwargs):
1153+
return get_client(**kwargs)
1154+
1155+
11501156
if __name__ == "__main__":
11511157
unittest.main()

test/test_replica_set_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
delay, assertReadFrom, assertReadFromAll, read_from_which_host,
5353
remove_all_users, assertRaisesExactly, TestRequestMixin, one,
5454
server_started_with_auth, pools_from_rs_client, get_pool,
55-
_TestLazyConnectMixin)
55+
_TestLazyConnectMixin, _TestExhaustCursorMixin)
5656

5757

5858
class TestReplicaSetClientAgainstStandalone(unittest.TestCase):
@@ -1274,5 +1274,12 @@ def test_max_write_batch_size(self):
12741274
self.assertEqual(c.max_write_batch_size, 2)
12751275

12761276

1277+
class TestReplicaSetClientExhaustCursor(
1278+
_TestExhaustCursorMixin,
1279+
TestReplicaSetClientBase):
1280+
1281+
# Base class implements _get_client already.
1282+
pass
1283+
12771284
if __name__ == "__main__":
12781285
unittest.main()

test/utils.py

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
"""Utilities for testing pymongo
1616
"""
1717

18+
import gc
1819
import os
1920
import struct
2021
import sys
2122
import threading
23+
import time
2224

2325
from nose.plugins.skip import SkipTest
2426
from pymongo import MongoClient, MongoReplicaSetClient
25-
from pymongo.errors import AutoReconnect
27+
from pymongo.errors import AutoReconnect, ConnectionFailure, OperationFailure
2628
from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo
2729
from test import host, port, version
2830

@@ -586,6 +588,130 @@ def test_max_bson_size(self):
586588
c.max_message_size)
587589

588590

591+
def collect_until(fn):
592+
start = time.time()
593+
while not fn():
594+
if (time.time() - start) > 5:
595+
raise AssertionError("timed out")
596+
597+
gc.collect()
598+
599+
600+
class _TestExhaustCursorMixin(object):
601+
"""Test that clients properly handle errors from exhaust cursors.
602+
603+
Inherit from this class and from unittest.TestCase, and override
604+
_get_client(self, **kwargs).
605+
"""
606+
def test_exhaust_query_server_error(self):
607+
# When doing an exhaust query, the socket stays checked out on success
608+
# but must be checked in on error to avoid semaphore leaks.
609+
client = self._get_client(max_pool_size=1)
610+
if is_mongos(client):
611+
raise SkipTest("Can't use exhaust cursors with mongos")
612+
613+
collection = client.pymongo_test.test
614+
pool = get_pool(client)
615+
616+
sock_info = one(pool.sockets)
617+
cursor = collection.find({'$bad_query_operator': 1}, exhaust=True)
618+
self.assertRaises(OperationFailure, cursor.next)
619+
del cursor
620+
collect_until(lambda: sock_info in pool.sockets)
621+
self.assertFalse(sock_info.closed)
622+
623+
# The semaphore was decremented despite the error.
624+
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
625+
626+
def test_exhaust_getmore_server_error(self):
627+
# When doing a getmore on an exhaust cursor, the socket stays checked
628+
# out on success but must be checked in on error to avoid semaphore
629+
# leaks.
630+
client = self._get_client(max_pool_size=1)
631+
if is_mongos(client):
632+
raise SkipTest("Can't use exhaust cursors with mongos")
633+
634+
# A separate client that doesn't affect the test client's pool.
635+
client2 = self._get_client()
636+
637+
collection = client.pymongo_test.test
638+
collection.remove()
639+
640+
# Enough data to ensure it streams down for a few milliseconds.
641+
long_str = 'a' * (256 * 1024)
642+
collection.insert([{'a': long_str} for _ in range(1000)])
643+
644+
pool = get_pool(client)
645+
pool._check_interval_seconds = None # Never check.
646+
sock_info = one(pool.sockets)
647+
648+
cursor = collection.find(exhaust=True)
649+
650+
# Initial query succeeds.
651+
cursor.next()
652+
653+
# Cause a server error on getmore.
654+
client2.pymongo_test.test.drop()
655+
self.assertRaises(OperationFailure, list, cursor)
656+
del cursor
657+
collect_until(lambda: sock_info.closed)
658+
self.assertFalse(sock_info in pool.sockets)
659+
660+
# The semaphore was decremented despite the error.
661+
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
662+
663+
def test_exhaust_query_network_error(self):
664+
# When doing an exhaust query, the socket stays checked out on success
665+
# but must be checked in on error to avoid semaphore leaks.
666+
client = self._get_client(max_pool_size=1)
667+
if is_mongos(client):
668+
raise SkipTest("Can't use exhaust cursors with mongos")
669+
670+
collection = client.pymongo_test.test
671+
pool = get_pool(client)
672+
pool._check_interval_seconds = None # Never check.
673+
674+
# Cause a network error.
675+
sock_info = one(pool.sockets)
676+
sock_info.sock.close()
677+
cursor = collection.find(exhaust=True)
678+
self.assertRaises(ConnectionFailure, cursor.next)
679+
self.assertTrue(sock_info.closed)
680+
681+
# The semaphore was decremented despite the error.
682+
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
683+
684+
def test_exhaust_getmore_network_error(self):
685+
# When doing a getmore on an exhaust cursor, the socket stays checked
686+
# out on success but must be checked in on error to avoid semaphore
687+
# leaks.
688+
client = self._get_client(max_pool_size=1)
689+
if is_mongos(client):
690+
raise SkipTest("Can't use exhaust cursors with mongos")
691+
692+
collection = client.pymongo_test.test
693+
collection.remove()
694+
collection.insert([{} for _ in range(200)]) # More than one batch.
695+
pool = get_pool(client)
696+
pool._check_interval_seconds = None # Never check.
697+
698+
cursor = collection.find(exhaust=True)
699+
700+
# Initial query succeeds.
701+
cursor.next()
702+
703+
# Cause a network error.
704+
sock_info = cursor._Cursor__exhaust_mgr.sock
705+
sock_info.sock.close()
706+
707+
# A getmore fails.
708+
self.assertRaises(ConnectionFailure, list, cursor)
709+
self.assertTrue(sock_info.closed)
710+
711+
# The semaphore was decremented despite the error.
712+
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
713+
714+
589715
# Backport of WarningMessage from python 2.6, with fixed syntax for python 2.4.
590716
class WarningMessage(object):
591717

0 commit comments

Comments
 (0)