Skip to content

Commit a2a83c0

Browse files
committed
PYTHON-952 - Publish events for uses of _first_batch
1 parent 8a21411 commit a2a83c0

File tree

4 files changed

+80
-11
lines changed

4 files changed

+80
-11
lines changed

pymongo/collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,8 +1340,8 @@ def list_indexes(self):
13401340
codec_options = CodecOptions(SON)
13411341
coll = self.with_options(codec_options)
13421342
with self._socket_for_primary_reads() as (sock_info, slave_ok):
1343+
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
13431344
if sock_info.max_wire_version > 2:
1344-
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
13451345
cursor = self._command(sock_info, cmd, slave_ok,
13461346
ReadPreference.PRIMARY,
13471347
codec_options)["cursor"]
@@ -1350,7 +1350,7 @@ def list_indexes(self):
13501350
namespace = _UJOIN % (self.__database.name, "system.indexes")
13511351
res = helpers._first_batch(
13521352
sock_info, namespace, {"ns": self.__full_name},
1353-
0, slave_ok, codec_options, ReadPreference.PRIMARY)
1353+
0, slave_ok, codec_options, ReadPreference.PRIMARY, cmd)
13541354
data = res["data"]
13551355
cursor = {
13561356
"id": res["cursor_id"],

pymongo/database.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,18 +456,19 @@ def command(self, command, value=1, check=True,
456456
def _list_collections(self, sock_info, slave_okay, criteria=None):
457457
"""Internal listCollections helper."""
458458
criteria = criteria or {}
459+
cmd = SON([("listCollections", 1), ("cursor", {})])
460+
if criteria:
461+
cmd["filter"] = criteria
462+
459463
if sock_info.max_wire_version > 2:
460-
cmd = SON([("listCollections", 1), ("cursor", {})])
461-
if criteria:
462-
cmd["filter"] = criteria
463464
coll = self["$cmd"]
464465
cursor = self._command(sock_info, cmd, slave_okay)["cursor"]
465466
return CommandCursor(coll, cursor, sock_info.address)
466467
else:
467468
coll = self["system.namespaces"]
468469
res = _first_batch(sock_info, coll.full_name,
469470
criteria, 0, slave_okay,
470-
CodecOptions(), ReadPreference.PRIMARY)
471+
CodecOptions(), ReadPreference.PRIMARY, cmd)
471472
data = res["data"]
472473
cursor = {
473474
"id": res["cursor_id"],

pymongo/helpers.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
"""Bits and pieces used by the driver that don't really fit elsewhere."""
1616

1717
import collections
18+
import datetime
1819
import struct
19-
from pymongo.message import _Query
2020

2121
import bson
22-
import pymongo
2322
from bson.codec_options import CodecOptions
2423
from bson.py3compat import itervalues, string_type, iteritems, u
2524
from bson.son import SON
25+
from pymongo import ASCENDING, monitoring
2626
from pymongo.errors import (CursorNotFound,
2727
DuplicateKeyError,
2828
ExecutionTimeout,
@@ -31,6 +31,7 @@
3131
WriteError,
3232
WriteConcernError,
3333
WTimeoutError)
34+
from pymongo.message import _Query
3435

3536

3637
_UUNDER = u("_")
@@ -50,7 +51,7 @@ def _index_list(key_or_list, direction=None):
5051
return [(key_or_list, direction)]
5152
else:
5253
if isinstance(key_or_list, string_type):
53-
return [(key_or_list, pymongo.ASCENDING)]
54+
return [(key_or_list, ASCENDING)]
5455
elif not isinstance(key_or_list, (list, tuple)):
5556
raise TypeError("if no direction is specified, "
5657
"key_or_list must be an instance of list")
@@ -233,16 +234,43 @@ def _check_gle_response(response):
233234

234235

235236
def _first_batch(sock_info, namespace, query,
236-
ntoreturn, slave_ok, codec_options, read_preference):
237+
ntoreturn, slave_ok, codec_options, read_preference, cmd):
237238
"""Simple query helper for retrieving a first (and possibly only) batch."""
238239
query = _Query(
239240
0, namespace, 0, ntoreturn, query, None,
240241
codec_options, read_preference, 0, ntoreturn)
242+
243+
name = next(iter(cmd))
244+
duration = None
245+
publish = monitoring.enabled()
246+
if publish:
247+
start = datetime.datetime.now()
248+
241249
request_id, msg, max_doc_size = query.get_message(slave_ok,
242250
sock_info.is_mongos)
251+
252+
if publish:
253+
encoding_duration = datetime.datetime.now() - start
254+
monitoring.publish_command_start(
255+
cmd, namespace.split('.', 1)[0], request_id, sock_info.address)
256+
start = datetime.datetime.now()
257+
243258
sock_info.send_message(msg, max_doc_size)
244259
response = sock_info.receive_message(1, request_id)
245-
return _unpack_response(response, None, codec_options)
260+
try:
261+
result = _unpack_response(response, None, codec_options)
262+
except (NotMasterError, OperationFailure) as exc:
263+
if publish:
264+
duration = (datetime.datetime.now() - start) + encoding_duration
265+
monitoring.publish_command_failure(
266+
duration, exc.details, name, request_id, sock_info.address)
267+
raise
268+
if publish:
269+
duration = (datetime.datetime.now() - start) + encoding_duration
270+
monitoring.publish_command_success(
271+
duration, result, name, request_id, sock_info.address)
272+
273+
return result
246274

247275

248276
def _check_write_command_response(results):

test/test_monitoring.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,46 @@ def test_write_errors(self):
11871187
for error in errors:
11881188
self.assertEqual(fields, set(error))
11891189

1190+
def test_first_batch_helper(self):
1191+
# Regardless of server version and use of helpers._first_batch
1192+
# this test should still pass.
1193+
self.listener.results.clear()
1194+
self.client.pymongo_test.collection_names()
1195+
results = self.listener.results
1196+
started = results['started'][0]
1197+
succeeded = results['succeeded'][0]
1198+
self.assertEqual(0, len(results['failed']))
1199+
self.assertIsInstance(started, monitoring.CommandStartedEvent)
1200+
expected = SON([('listCollections', 1), ('cursor', {})])
1201+
self.assertEqual(expected, started.command)
1202+
self.assertEqual('pymongo_test', started.database_name)
1203+
self.assertEqual('listCollections', started.command_name)
1204+
self.assertIsInstance(started.request_id, int)
1205+
self.assertEqual(self.client.address, started.connection_id)
1206+
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
1207+
self.assertIsInstance(succeeded.duration_micros, int)
1208+
self.assertEqual(started.command_name, succeeded.command_name)
1209+
self.assertEqual(started.request_id, succeeded.request_id)
1210+
self.assertEqual(started.connection_id, succeeded.connection_id)
1211+
1212+
self.listener.results.clear()
1213+
tuple(self.client.pymongo_test.test.list_indexes())
1214+
started = results['started'][0]
1215+
succeeded = results['succeeded'][0]
1216+
self.assertEqual(0, len(results['failed']))
1217+
self.assertIsInstance(started, monitoring.CommandStartedEvent)
1218+
expected = SON([('listIndexes', 'test'), ('cursor', {})])
1219+
self.assertEqual(expected, started.command)
1220+
self.assertEqual('pymongo_test', started.database_name)
1221+
self.assertEqual('listIndexes', started.command_name)
1222+
self.assertIsInstance(started.request_id, int)
1223+
self.assertEqual(self.client.address, started.connection_id)
1224+
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
1225+
self.assertIsInstance(succeeded.duration_micros, int)
1226+
self.assertEqual(started.command_name, succeeded.command_name)
1227+
self.assertEqual(started.request_id, succeeded.request_id)
1228+
self.assertEqual(started.connection_id, succeeded.connection_id)
1229+
11901230

11911231
if __name__ == "__main__":
11921232
unittest.main()

0 commit comments

Comments
 (0)