Skip to content

Commit 49cee29

Browse files
committed
PYTHON-1408 Cursor iteration should complete when another thread closes the cursor.
Closing a cursor should not raise an error when killCursors fails.
1 parent 28969a7 commit 49cee29

File tree

5 files changed

+51
-7
lines changed

5 files changed

+51
-7
lines changed

pymongo/command_cursor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ def __del__(self):
7171
def __die(self, synchronous=False):
7272
"""Closes this cursor.
7373
"""
74-
if self.__id and not self.__killed:
74+
already_killed = self.__killed
75+
self.__killed = True
76+
if self.__id and not already_killed:
7577
address = _CursorAddress(
7678
self.__address, self.__collection.full_name)
7779
if synchronous:
@@ -81,7 +83,6 @@ def __die(self, synchronous=False):
8183
# The cursor will be closed later in a different session.
8284
self.__collection.database.client.close_cursor(
8385
self.__id, address)
84-
self.__killed = True
8586
self.__end_session(synchronous)
8687

8788
def __end_session(self, synchronous):

pymongo/cursor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,9 @@ def _clone_base(self, session):
293293
def __die(self, synchronous=False):
294294
"""Closes this cursor.
295295
"""
296-
if self.__id and not self.__killed:
296+
already_killed = self.__killed
297+
self.__killed = True
298+
if self.__id and not already_killed:
297299
if self.__exhaust and self.__exhaust_mgr:
298300
# If this is an exhaust cursor and we haven't completely
299301
# exhausted the result set we *must* close the socket
@@ -311,7 +313,6 @@ def __die(self, synchronous=False):
311313
self.__id, address)
312314
if self.__exhaust and self.__exhaust_mgr:
313315
self.__exhaust_mgr.close()
314-
self.__killed = True
315316
if self.__session and not self.__explicit_session:
316317
self.__session._end_session(lock=synchronous)
317318
self.__session = None

pymongo/mongo_client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@
5757
ConfigurationError,
5858
ConnectionFailure,
5959
InvalidOperation,
60-
InvalidURI,
6160
NetworkTimeout,
6261
NotMasterError,
6362
OperationFailure,
63+
PyMongoError,
6464
ServerSelectionTimeoutError)
6565
from pymongo.read_preferences import ReadPreference
6666
from pymongo.server_selectors import (writable_preferred_server_selector,
@@ -1150,8 +1150,12 @@ def _close_cursor_now(self, cursor_id, address=None, session=None):
11501150
if self.__cursor_manager is not None:
11511151
self.__cursor_manager.close(cursor_id, address)
11521152
else:
1153-
self._kill_cursors(
1154-
[cursor_id], address, self._get_topology(), session)
1153+
try:
1154+
self._kill_cursors(
1155+
[cursor_id], address, self._get_topology(), session)
1156+
except PyMongoError:
1157+
# Make another attempt to kill the cursor later.
1158+
self.__kill_cursors_queue.append((address, [cursor_id]))
11551159

11561160
def kill_cursors(self, cursor_ids, address=None):
11571161
"""DEPRECATED - Send a kill cursors message soon with the given ids.

test/test_change_stream.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,21 @@ def test_aggregate_cursor_blocks(self):
169169
self.assertEqual(changes[0]['operationType'], 'insert')
170170
self.assertEqual(changes[0]['fullDocument'], inserted_doc)
171171

172+
def test_concurrent_close(self):
173+
"""Ensure a ChangeStream can be closed from another thread."""
174+
# Use a short await time to speed up the test.
175+
with self.coll.watch(max_await_time_ms=250) as change_stream:
176+
def iterate_cursor():
177+
for change in change_stream:
178+
pass
179+
t = threading.Thread(target=iterate_cursor)
180+
t.start()
181+
self.coll.insert_one({})
182+
time.sleep(1)
183+
change_stream.close()
184+
t.join(3)
185+
self.assertFalse(t.is_alive())
186+
172187
def test_update_resume_token(self):
173188
"""ChangeStream must continuously track the last seen resumeToken."""
174189
with self.coll.watch() as change_stream:

test/test_cursor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import random
2020
import re
2121
import sys
22+
import time
23+
import threading
2224

2325
sys.path[0:0] = [""]
2426

@@ -1130,6 +1132,27 @@ def test_tailable(self):
11301132
cursor.rewind()
11311133
self.assertEqual([4, 5, 6], [doc["x"] for doc in cursor[0:3]])
11321134

1135+
def test_concurrent_close(self):
1136+
"""Ensure a tailable can be closed from another thread."""
1137+
db = self.db
1138+
db.drop_collection("test")
1139+
db.create_collection("test", capped=True, size=1000, max=3)
1140+
self.addCleanup(db.drop_collection, "test")
1141+
cursor = db.test.find(cursor_type=CursorType.TAILABLE)
1142+
1143+
def iterate_cursor():
1144+
while cursor.alive:
1145+
for doc in cursor:
1146+
pass
1147+
t = threading.Thread(target=iterate_cursor)
1148+
t.start()
1149+
time.sleep(1)
1150+
cursor.close()
1151+
self.assertFalse(cursor.alive)
1152+
t.join(3)
1153+
self.assertFalse(t.is_alive())
1154+
1155+
11331156
def test_distinct(self):
11341157
self.db.drop_collection("test")
11351158

0 commit comments

Comments
 (0)