Skip to content

Commit 5396444

Browse files
aherlihybehackett
authored andcommitted
PYTHON-978 - Use killCursors command
1 parent 497998d commit 5396444

File tree

4 files changed

+84
-33
lines changed

4 files changed

+84
-33
lines changed

pymongo/mongo_client.py

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -937,35 +937,48 @@ def _process_kill_cursors_queue(self):
937937
server = topology.select_server(
938938
writable_server_selector)
939939

940-
if publish:
941-
start = datetime.datetime.now()
942-
data = message.kill_cursors(cursor_ids)
943-
if publish:
944-
duration = datetime.datetime.now() - start
945-
try:
946-
dbname, collname = address.namespace.split(".", 1)
947-
except AttributeError:
948-
dbname = collname = 'OP_KILL_CURSORS'
949-
command = SON([('killCursors', collname),
950-
('cursors', cursor_ids)])
951-
listeners.publish_command_start(
952-
command, dbname, data[0], address)
953-
start = datetime.datetime.now()
954940
try:
955-
server.send_message(data, self.__all_credentials)
956-
except Exception as exc:
957-
if publish:
958-
dur = (datetime.datetime.now() - start) + duration
959-
listeners.publish_command_failure(
960-
dur, message._convert_exception(exc),
961-
'killCursors', data[0], address)
962-
raise
963-
if publish:
964-
duration = (datetime.datetime.now() - start) + duration
965-
# OP_KILL_CURSORS returns no reply, fake one.
966-
reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
967-
listeners.publish_command_success(
968-
duration, reply, 'killCursors', data[0], address)
941+
namespace = address.namespace
942+
db, coll = namespace.split('.', 1)
943+
except AttributeError:
944+
namespace = None
945+
db = coll = "OP_KILL_CURSORS"
946+
947+
spec = SON([('killCursors', coll),
948+
('cursors', cursor_ids)])
949+
with server.get_socket(self.__all_credentials) as sock_info:
950+
if (sock_info.max_wire_version >= 4 and
951+
namespace is not None):
952+
sock_info.command(db, spec, slave_ok=True)
953+
else:
954+
if publish:
955+
start = datetime.datetime.now()
956+
request_id, msg = message.kill_cursors(cursor_ids)
957+
if publish:
958+
duration = datetime.datetime.now() - start
959+
listeners.publish_command_start(
960+
spec, db, request_id, address)
961+
start = datetime.datetime.now()
962+
963+
try:
964+
sock_info.send_message(msg, 0)
965+
except Exception as exc:
966+
if publish:
967+
dur = ((datetime.datetime.now() - start)
968+
+ duration)
969+
listeners.publish_command_failure(
970+
dur, message._convert_exception(exc),
971+
'killCursors', request_id, address)
972+
raise
973+
974+
if publish:
975+
duration = ((datetime.datetime.now() - start)
976+
+ duration)
977+
# OP_KILL_CURSORS returns no reply, fake one.
978+
reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
979+
listeners.publish_command_success(
980+
duration, reply, 'killCursors', request_id,
981+
address)
969982

970983
except ConnectionFailure as exc:
971984
warnings.warn("couldn't close cursor on %s: %s"

test/test_client.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
CursorNotFound,
4343
NetworkTimeout,
4444
InvalidURI)
45+
from pymongo.message import _CursorAddress
4546
from pymongo.mongo_client import MongoClient
4647
from pymongo.pool import SocketInfo
4748
from pymongo.read_preferences import ReadPreference
@@ -707,7 +708,7 @@ def test_operation_failure(self):
707708
new_sock_info = next(iter(pool.sockets))
708709
self.assertEqual(old_sock_info, new_sock_info)
709710

710-
def test_kill_cursors(self):
711+
def test_kill_cursors_with_cursoraddress(self):
711712
if (client_context.is_mongos
712713
and not client_context.version.at_least(2, 4, 7)):
713714
# Old mongos sends incorrectly formatted error response when
@@ -720,7 +721,40 @@ def test_kill_cursors(self):
720721
self.collection.insert_many([{'_id': i} for i in range(200)])
721722
cursor = self.collection.find().batch_size(1)
722723
next(cursor)
723-
self.client.kill_cursors([cursor.cursor_id])
724+
self.client.kill_cursors(
725+
[cursor.cursor_id],
726+
_CursorAddress(self.client.address, self.collection.full_name))
727+
728+
# Prevent killcursors from reaching the server while a getmore is in
729+
# progress -- the server logs "Assertion: 16089:Cannot kill active
730+
# cursor."
731+
time.sleep(2)
732+
733+
def raises_cursor_not_found():
734+
try:
735+
next(cursor)
736+
return False
737+
except CursorNotFound:
738+
return True
739+
740+
wait_until(raises_cursor_not_found, 'close cursor')
741+
742+
def test_kill_cursors_with_tuple(self):
743+
if (client_context.is_mongos
744+
and not client_context.version.at_least(2, 4, 7)):
745+
# Old mongos sends incorrectly formatted error response when
746+
# cursor isn't found, see SERVER-9738.
747+
raise SkipTest("Can't test kill_cursors against old mongos")
748+
749+
self.collection = self.client.pymongo_test.test
750+
self.collection.drop()
751+
752+
self.collection.insert_many([{'_id': i} for i in range(200)])
753+
cursor = self.collection.find().batch_size(1)
754+
next(cursor)
755+
self.client.kill_cursors(
756+
[cursor.cursor_id],
757+
self.client.address)
724758

725759
# Prevent killcursors from reaching the server while a getmore is in
726760
# progress -- the server logs "Assertion: 16089:Cannot kill active

test/test_cursor_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from pymongo.cursor_manager import CursorManager
2222
from pymongo.errors import CursorNotFound
23+
from pymongo.message import _CursorAddress
2324
from test import (client_context,
2425
client_knobs,
2526
unittest,
@@ -74,7 +75,9 @@ def close(self, cursor_id, address):
7475
# is sent after the killCursors message.
7576
cursor = client.pymongo_test.test.find().batch_size(1)
7677
next(cursor)
77-
client.close_cursor(cursor.cursor_id)
78+
client.close_cursor(
79+
cursor.cursor_id,
80+
_CursorAddress(self.client.address, self.collection.full_name))
7881

7982
def raises_cursor_not_found():
8083
try:

test/test_monitoring.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def test_find_and_get_more(self):
170170
self.assertEqual('find', succeeded.command_name)
171171
self.assertTrue(isinstance(succeeded.request_id, int))
172172
self.assertEqual(cursor.address, succeeded.connection_id)
173-
csr = succeeded.reply[u'cursor']
173+
csr = succeeded.reply["cursor"]
174174
self.assertEqual(csr["id"], cursor_id)
175175
self.assertEqual(csr["ns"], "pymongo_test.test")
176176
self.assertEqual(csr["firstBatch"], [{} for _ in range(4)])
@@ -527,7 +527,8 @@ def test_kill_cursors(self):
527527
self.assertEqual(cursor.address, succeeded.connection_id)
528528
# There could be more than one cursor_id here depending on
529529
# when the thread last ran.
530-
self.assertIn(cursor_id, succeeded.reply['cursorsUnknown'])
530+
self.assertTrue(cursor_id in succeeded.reply['cursorsUnknown']
531+
or cursor_id in succeeded.reply['cursorsKilled'])
531532

532533
def test_non_bulk_writes(self):
533534
coll = self.client.pymongo_test.test

0 commit comments

Comments
 (0)