Skip to content

Commit 0e4f79c

Browse files
committed
PYTHON-952 - Non-bulk write operation monitoring
1 parent 5dba74c commit 0e4f79c

File tree

8 files changed

+659
-70
lines changed

8 files changed

+659
-70
lines changed

pymongo/_cmessagemodule.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,16 +421,18 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
421421
codec_options_t options;
422422
buffer_t buffer;
423423
int length_location, message_length;
424+
unsigned char check_keys = 0;
424425
PyObject* result;
425426

426-
if (!PyArg_ParseTuple(args, "Iet#iiOOO&",
427+
if (!PyArg_ParseTuple(args, "Iet#iiOOO&|b",
427428
&flags,
428429
"utf-8",
429430
&collection_name,
430431
&collection_name_length,
431432
&num_to_skip, &num_to_return,
432433
&query, &field_selector,
433-
convert_codec_options, &options)) {
434+
convert_codec_options, &options,
435+
&check_keys)) {
434436
return NULL;
435437
}
436438
buffer = buffer_new();
@@ -463,7 +465,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
463465
}
464466

465467
begin = buffer_get_position(buffer);
466-
if (!write_dict(state->_cbson, buffer, query, 0, &options, 1)) {
468+
if (!write_dict(state->_cbson, buffer, query, check_keys, &options, 1)) {
467469
destroy_codec_options(&options);
468470
buffer_free(buffer);
469471
PyMem_Free(collection_name);

pymongo/bulk.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def _merge_legacy(run, full_result, result, index):
123123
full_result['nUpserted'] += affected
124124
# Versions of MongoDB before 2.6 don't return the _id for an
125125
# upsert if _id is not an ObjectId.
126-
elif result.get("updatedExisting") == False and affected == 1:
126+
elif result.get("updatedExisting") is False and affected == 1:
127127
op = run.ops[index]
128128
# If _id is in both the update document *and* the query spec
129129
# the update document _id takes precedence.

pymongo/collection.py

Lines changed: 123 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Collection level utilities for Mongo."""
1616

1717
import collections
18+
import datetime
1819
import warnings
1920

2021
from bson.code import Code
@@ -27,13 +28,14 @@
2728
from bson.son import SON
2829
from pymongo import (common,
2930
helpers,
30-
message)
31+
message,
32+
monitoring)
3133
from pymongo.bulk import BulkOperationBuilder, _Bulk
3234
from pymongo.command_cursor import CommandCursor
3335
from pymongo.cursor import Cursor
3436
from pymongo.errors import ConfigurationError, InvalidName, OperationFailure
3537
from pymongo.helpers import _check_write_command_response
36-
from pymongo.message import _INSERT, _UPDATE, _DELETE
38+
from pymongo.message import _INSERT
3739
from pymongo.operations import _WriteOp, IndexModel
3840
from pymongo.read_preferences import ReadPreference
3941
from pymongo.results import (BulkWriteResult,
@@ -380,13 +382,85 @@ def bulk_write(self, requests, ordered=True):
380382
return BulkWriteResult(bulk_api_result, True)
381383
return BulkWriteResult({}, False)
382384

385+
def _legacy_write(
386+
self, sock_info, name, command, acknowledged, func, *args):
387+
"""Internal legacy write helper."""
388+
publish = monitoring.enabled()
389+
if publish:
390+
start = datetime.datetime.now()
391+
rqst_id, msg, max_size = func(*args)
392+
if publish:
393+
duration = datetime.datetime.now() - start
394+
monitoring.publish_command_start(
395+
command, self.__database.name, rqst_id, sock_info.address)
396+
start = datetime.datetime.now()
397+
try:
398+
result = sock_info.legacy_write(
399+
rqst_id, msg, max_size, acknowledged)
400+
except OperationFailure as exc:
401+
if publish:
402+
duration = (datetime.datetime.now() - start) + duration
403+
details = exc.details
404+
# Succeed if GLE was successful and this is a write error.
405+
# XXX: Is checking if "n" is in details the best way to
406+
# differentiate write errors from something else?
407+
if details.get("ok") and "n" in details:
408+
reply = helpers._upconvert_write_result(
409+
name, command, details)
410+
monitoring.publish_command_success(
411+
duration, reply, name, rqst_id, sock_info.address)
412+
else:
413+
monitoring.publish_command_failure(
414+
duration, details, name, rqst_id, sock_info.address)
415+
raise
416+
if publish:
417+
# No result for w=0
418+
reply = None
419+
if result:
420+
reply = helpers._upconvert_write_result(name, command, result)
421+
duration = (datetime.datetime.now() - start) + duration
422+
monitoring.publish_command_success(
423+
duration, reply, name, rqst_id, sock_info.address)
424+
return result
425+
426+
def _insert_one(
427+
self, sock_info, doc, check_keys, manipulate, write_concern):
428+
"""Internal helper for inserting a single document."""
429+
if manipulate:
430+
doc = self.__database._apply_incoming_manipulators(doc, self)
431+
if '_id' not in doc:
432+
doc['_id'] = ObjectId()
433+
doc = self.__database._apply_incoming_copying_manipulators(doc,
434+
self)
435+
concern = (write_concern or self.write_concern).document
436+
acknowledged = concern.get("w") != 0
437+
command = SON([('insert', self.name),
438+
('documents', [doc]),
439+
('ordered', True)])
440+
if acknowledged and concern:
441+
command['writeConcern'] = concern
442+
443+
if sock_info.max_wire_version > 1 and acknowledged:
444+
# Insert command.
445+
result = sock_info.command(self.__database.name,
446+
command,
447+
codec_options=self.codec_options,
448+
check_keys=check_keys)
449+
_check_write_command_response([(0, result)])
450+
else:
451+
# Legacy OP_INSERT.
452+
self._legacy_write(
453+
sock_info, 'insert', command, acknowledged,
454+
message.insert, self.__full_name, [doc], check_keys,
455+
acknowledged, concern, False, self.codec_options)
456+
return doc.get('_id')
457+
383458
def _insert(self, sock_info, docs, ordered=True,
384459
check_keys=True, manipulate=False, write_concern=None):
385460
"""Internal insert helper."""
386-
return_one = False
387461
if isinstance(docs, collections.MutableMapping):
388-
return_one = True
389-
docs = [docs]
462+
return self._insert_one(
463+
sock_info, docs, check_keys, manipulate, write_concern)
390464

391465
ids = []
392466

@@ -418,7 +492,7 @@ def gen():
418492
safe = concern.get("w") != 0
419493

420494
if sock_info.max_wire_version > 1 and safe:
421-
# Insert command.
495+
# Batched insert command.
422496
command = SON([('insert', self.name),
423497
('ordered', ordered)])
424498

@@ -434,10 +508,7 @@ def gen():
434508
message._do_batched_insert(self.__full_name, gen(), check_keys,
435509
safe, concern, not ordered,
436510
self.codec_options, sock_info)
437-
if return_one:
438-
return ids[0]
439-
else:
440-
return ids
511+
return ids
441512

442513
def insert_one(self, document):
443514
"""Insert a single document.
@@ -508,33 +579,32 @@ def gen():
508579
blk.execute(self.write_concern.document)
509580
return InsertManyResult(inserted_ids, self.write_concern.acknowledged)
510581

511-
def _update(self, sock_info, filter, document, upsert=False,
582+
def _update(self, sock_info, criteria, document, upsert=False,
512583
check_keys=True, multi=False, manipulate=False,
513584
write_concern=None):
514585
"""Internal update / replace helper."""
515-
common.validate_is_mapping("filter", filter)
516586
common.validate_boolean("upsert", upsert)
517587
if manipulate:
518588
document = self.__database._fix_incoming(document, self)
519-
520589
concern = (write_concern or self.write_concern).document
521-
safe = concern.get("w") != 0
522-
523-
if sock_info.max_wire_version > 1 and safe:
590+
acknowledged = concern.get("w") != 0
591+
command = SON([('update', self.name),
592+
('updates', [SON([('q', criteria),
593+
('u', document),
594+
('multi', multi),
595+
('upsert', upsert)])]),
596+
('ordered', True)])
597+
if acknowledged and concern:
598+
command['writeConcern'] = concern
599+
if sock_info.max_wire_version > 1 and acknowledged:
524600
# Update command.
525-
command = SON([('update', self.name)])
526-
if concern:
527-
command['writeConcern'] = concern
528-
529-
docs = [SON([('q', filter), ('u', document),
530-
('multi', multi), ('upsert', upsert)])]
531601

532-
results = message._do_batched_write_command(
533-
self.database.name + '.$cmd', _UPDATE, command,
534-
docs, check_keys, self.codec_options, sock_info)
535-
_check_write_command_response(results)
536-
537-
_, result = results[0]
602+
# The command result has to be published for APM unmodified
603+
# so we make a shallow copy here before adding updatedExisting.
604+
result = sock_info.command(self.__database.name,
605+
command,
606+
codec_options=self.codec_options).copy()
607+
_check_write_command_response([(0, result)])
538608
# Add the updatedExisting field for compatibility.
539609
if result.get('n') and 'upserted' not in result:
540610
result['updatedExisting'] = True
@@ -546,13 +616,12 @@ def _update(self, sock_info, filter, document, upsert=False,
546616
result['upserted'] = result['upserted'][0]['_id']
547617

548618
return result
549-
550619
else:
551620
# Legacy OP_UPDATE.
552-
request_id, msg, max_size = message.update(
553-
self.__full_name, upsert, multi, filter, document, safe,
554-
concern, check_keys, self.codec_options)
555-
return sock_info.legacy_write(request_id, msg, max_size, safe)
621+
return self._legacy_write(
622+
sock_info, 'update', command, acknowledged, message.update,
623+
self.__full_name, upsert, multi, criteria, document,
624+
acknowledged, concern, check_keys, self.codec_options)
556625

557626
def replace_one(self, filter, replacement, upsert=False):
558627
"""Replace a single document matching the filter.
@@ -595,6 +664,7 @@ def replace_one(self, filter, replacement, upsert=False):
595664
596665
.. versionadded:: 3.0
597666
"""
667+
common.validate_is_mapping("filter", filter)
598668
common.validate_ok_for_replace(replacement)
599669
with self._socket_for_writes() as sock_info:
600670
result = self._update(sock_info, filter, replacement, upsert)
@@ -632,6 +702,7 @@ def update_one(self, filter, update, upsert=False):
632702
633703
.. versionadded:: 3.0
634704
"""
705+
common.validate_is_mapping("filter", filter)
635706
common.validate_ok_for_update(update)
636707
with self._socket_for_writes() as sock_info:
637708
result = self._update(sock_info, filter, update,
@@ -670,6 +741,7 @@ def update_many(self, filter, update, upsert=False):
670741
671742
.. versionadded:: 3.0
672743
"""
744+
common.validate_is_mapping("filter", filter)
673745
common.validate_ok_for_update(update)
674746
with self._socket_for_writes() as sock_info:
675747
result = self._update(sock_info, filter, update, upsert,
@@ -686,34 +758,31 @@ def drop(self):
686758
"""
687759
self.__database.drop_collection(self.__name)
688760

689-
def _delete(self, sock_info, filter, multi, write_concern=None):
761+
def _delete(self, sock_info, criteria, multi, write_concern=None):
690762
"""Internal delete helper."""
691-
common.validate_is_mapping("filter", filter)
763+
common.validate_is_mapping("filter", criteria)
692764
concern = (write_concern or self.write_concern).document
693-
safe = concern.get("w") != 0
694-
695-
if sock_info.max_wire_version > 1 and safe:
765+
acknowledged = concern.get("w") != 0
766+
command = SON([('delete', self.name),
767+
('deletes', [SON([('q', criteria),
768+
('limit', int(not multi))])]),
769+
('ordered', True)])
770+
if acknowledged and concern:
771+
command['writeConcern'] = concern
772+
773+
if sock_info.max_wire_version > 1 and acknowledged:
696774
# Delete command.
697-
command = SON([('delete', self.name)])
698-
if concern:
699-
command['writeConcern'] = concern
700-
701-
docs = [SON([('q', filter), ('limit', int(not multi))])]
702-
703-
results = message._do_batched_write_command(
704-
self.database.name + '.$cmd', _DELETE, command,
705-
docs, False, self.codec_options, sock_info)
706-
_check_write_command_response(results)
707-
708-
_, result = results[0]
775+
result = sock_info.command(self.__database.name,
776+
command,
777+
codec_options=self.codec_options)
778+
_check_write_command_response([(0, result)])
709779
return result
710-
711780
else:
712781
# Legacy OP_DELETE.
713-
request_id, msg, max_size = message.delete(
714-
self.__full_name, filter, safe, concern,
782+
return self._legacy_write(
783+
sock_info, 'delete', command, acknowledged, message.delete,
784+
self.__full_name, criteria, acknowledged, concern,
715785
self.codec_options, int(not multi))
716-
return sock_info.legacy_write(request_id, msg, max_size, safe)
717786

718787
def delete_one(self, filter):
719788
"""Delete a single document matching the filter.

pymongo/helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,47 @@ def _check_write_command_response(results):
274274
error.get("errmsg"), error.get("code"), error)
275275

276276

277+
def _upconvert_write_result(operation, command, result):
278+
"""Upconvert a legacy write result to write commmand format."""
279+
280+
# Based on _merge_legacy from bulk.py
281+
affected = result.get("n", 0)
282+
res = {"ok": 1, "n": affected}
283+
errmsg = result.get("errmsg", result.get("err", ""))
284+
if errmsg:
285+
# The write was successful on at least the primary so don't return.
286+
if result.get("wtimeout"):
287+
res["writeConcernError"] = {"errmsg": errmsg,
288+
"code": 64,
289+
"errInfo": {"wtimeout": True}}
290+
else:
291+
# The write failed.
292+
error = {"index": 0,
293+
"code": result.get("code", 8),
294+
"errmsg": errmsg}
295+
if "errInfo" in result:
296+
error["errInfo"] = result["errInfo"]
297+
res["writeErrors"] = [error]
298+
return res
299+
if operation == "insert":
300+
# GLE result for insert is always 0 in most MongoDB versions.
301+
res["n"] = 1
302+
elif operation == "update":
303+
res["nModified"] = 0
304+
if "upserted" in result:
305+
res["upserted"] = [{"index": 0, "_id": result["upserted"]}]
306+
# Versions of MongoDB before 2.6 don't return the _id for an
307+
# upsert if _id is not an ObjectId.
308+
elif result.get("updatedExisting") is False and affected == 1:
309+
# If _id is in both the update document *and* the query spec
310+
# the update document _id takes precedence.
311+
_id = command["u"].get("_id", command["q"].get("_id"))
312+
res["upserted"] = [{"index": 0, "_id": _id}]
313+
else:
314+
res["nModified"] = affected
315+
return res
316+
317+
277318
def _fields_list_to_dict(fields, option_name):
278319
"""Takes a sequence of field names and returns a matching dictionary.
279320

pymongo/message.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,7 @@ def __pack_message(operation, data):
258258

259259
def insert(collection_name, docs, check_keys,
260260
safe, last_error_args, continue_on_error, opts):
261-
"""Get an **insert** message.
262-
263-
Used by the Bulk API to insert into pre-2.6 servers. Collection.insert
264-
uses _do_batched_insert.
265-
"""
261+
"""Get an **insert** message."""
266262
options = 0
267263
if continue_on_error:
268264
options += 1
@@ -314,14 +310,14 @@ def update(collection_name, upsert, multi,
314310

315311

316312
def query(options, collection_name, num_to_skip,
317-
num_to_return, query, field_selector, opts):
313+
num_to_return, query, field_selector, opts, check_keys=False):
318314
"""Get a **query** message.
319315
"""
320316
data = struct.pack("<I", options)
321317
data += bson._make_c_string(collection_name)
322318
data += struct.pack("<i", num_to_skip)
323319
data += struct.pack("<i", num_to_return)
324-
encoded = bson.BSON.encode(query, False, opts)
320+
encoded = bson.BSON.encode(query, check_keys, opts)
325321
data += encoded
326322
max_bson_size = len(encoded)
327323
if field_selector is not None:

0 commit comments

Comments
 (0)