Skip to content

Commit 1a3aeea

Browse files
aherlihybehackett
authored andcommitted
PYTHON-978 - Use find/getMore commands
1 parent 01eb25b commit 1a3aeea

File tree

11 files changed

+132
-61
lines changed

11 files changed

+132
-61
lines changed

pymongo/collection.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,9 +1360,10 @@ def list_indexes(self):
13601360
else:
13611361
namespace = _UJOIN % (self.__database.name, "system.indexes")
13621362
res = helpers._first_batch(
1363-
sock_info, namespace, {"ns": self.__full_name},
1364-
0, slave_ok, codec_options, ReadPreference.PRIMARY,
1365-
cmd, self.database.client._event_listeners)
1363+
sock_info, self.__database.name, "system.indexes",
1364+
{"ns": self.__full_name}, 0, slave_ok, codec_options,
1365+
ReadPreference.PRIMARY, cmd,
1366+
self.database.client._event_listeners)
13661367
data = res["data"]
13671368
cursor = {
13681369
"id": res["cursor_id"],

pymongo/command_cursor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def __send_message(self, operation):
112112
doc = helpers._unpack_response(response.data,
113113
self.__id,
114114
self.__collection.codec_options)
115+
# TODO: check_command_response when getMore works with agg cursor
115116
except OperationFailure as exc:
116117
self.__killed = True
117118

@@ -170,7 +171,12 @@ def _refresh(self):
170171

171172
if self.__id: # Get More
172173
self.__send_message(
173-
_GetMore(self.__ns, self.__batch_size, self.__id))
174+
_GetMore(self.__collection.database.name,
175+
self.__collection.name,
176+
self.__batch_size,
177+
self.__id,
178+
self.__collection.codec_options,
179+
cmd_cursor=True))
174180

175181
else: # Cursor id is zero nothing else to return
176182
self.__killed = True

pymongo/cursor.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,7 @@ def __send_message(self, operation):
805805
client = self.__collection.database.client
806806
listeners = client._event_listeners
807807
publish = listeners.enabled_for_commands
808+
from_command = False
808809

809810
if operation:
810811
kwargs = {
@@ -827,6 +828,7 @@ def __send_message(self, operation):
827828
data = response.data
828829
cmd_duration = response.duration
829830
rqst_id = response.request_id
831+
from_command = response.from_command
830832
except AutoReconnect:
831833
# Don't try to send kill cursors on another socket
832834
# or to another server. It can cause a _pinValue
@@ -869,6 +871,8 @@ def __send_message(self, operation):
869871
doc = helpers._unpack_response(response=data,
870872
cursor_id=self.__id,
871873
codec_options=self.__codec_options)
874+
if from_command:
875+
helpers._check_command_response(doc['data'][0])
872876
except OperationFailure as exc:
873877
self.__killed = True
874878

@@ -913,7 +917,9 @@ def __send_message(self, operation):
913917
if publish:
914918
duration = (datetime.datetime.now() - start) + cmd_duration
915919
# Must publish in find / getMore / explain command response format.
916-
if cmd_name == "explain":
920+
if from_command:
921+
res = doc['data'][0]
922+
elif cmd_name == "explain":
917923
res = doc["data"][0] if doc["number_returned"] else {}
918924
else:
919925
res = {"cursor": {"id": doc["cursor_id"],
@@ -926,12 +932,23 @@ def __send_message(self, operation):
926932
listeners.publish_command_success(
927933
duration, res, cmd_name, rqst_id, self.__address)
928934

929-
self.__id = doc["cursor_id"]
935+
if from_command and cmd_name != "explain":
936+
cursor = doc['data'][0]['cursor']
937+
self.__id = cursor['id']
938+
if cmd_name == 'find':
939+
documents = cursor['firstBatch']
940+
else:
941+
documents = cursor['nextBatch']
942+
self.__data = deque(documents)
943+
self.__retrieved += len(documents)
944+
else:
945+
self.__id = doc["cursor_id"]
946+
self.__data = deque(doc["data"])
947+
self.__retrieved += doc["number_returned"]
948+
930949
if self.__id == 0:
931950
self.__killed = True
932951

933-
self.__retrieved += doc["number_returned"]
934-
self.__data = deque(doc["data"])
935952

936953
if self.__limit and self.__id and self.__limit <= self.__retrieved:
937954
self.__die()
@@ -959,7 +976,8 @@ def _refresh(self):
959976
else:
960977
ntoreturn = self.__limit
961978
self.__send_message(_Query(self.__query_flags,
962-
self.__collection.full_name,
979+
self.__collection.database.name,
980+
self.__collection.name,
963981
self.__skip,
964982
ntoreturn,
965983
self.__query_spec(),
@@ -982,9 +1000,11 @@ def _refresh(self):
9821000
if self.__exhaust:
9831001
self.__send_message(None)
9841002
else:
985-
self.__send_message(_GetMore(self.__collection.full_name,
1003+
self.__send_message(_GetMore(self.__collection.database.name,
1004+
self.__collection.name,
9861005
limit,
9871006
self.__id,
1007+
self.__codec_options,
9881008
self.__max_time_ms))
9891009

9901010
else: # Cursor id is zero nothing else to return

pymongo/database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ def _list_collections(self, sock_info, slave_okay, criteria=None):
466466
return CommandCursor(coll, cursor, sock_info.address)
467467
else:
468468
coll = self["system.namespaces"]
469-
res = _first_batch(sock_info, coll.full_name,
469+
res = _first_batch(sock_info, coll.database.name, coll.name,
470470
criteria, 0, slave_okay,
471471
CodecOptions(), ReadPreference.PRIMARY, cmd,
472472
self.client._event_listeners)
@@ -598,7 +598,7 @@ def current_op(self, include_all=False):
598598
return sock_info.command("admin", cmd)
599599
else:
600600
spec = {"$all": True} if include_all else {}
601-
x = helpers._first_batch(sock_info, "admin.$cmd.sys.inprog",
601+
x = helpers._first_batch(sock_info, "admin", "$cmd.sys.inprog",
602602
spec, -1, True, self.codec_options,
603603
ReadPreference.PRIMARY, cmd, self.client._event_listeners)
604604
return x.get('data', [None])[0]

pymongo/helpers.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
126126
error_object.get("code"),
127127
error_object)
128128

129-
result = {}
130-
result["cursor_id"] = struct.unpack("<q", response[4:12])[0]
131-
result["starting_from"] = struct.unpack("<i", response[12:16])[0]
132-
result["number_returned"] = struct.unpack("<i", response[16:20])[0]
133-
result["data"] = bson.decode_all(response[20:], codec_options)
129+
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
130+
"starting_from": struct.unpack("<i", response[12:16])[0],
131+
"number_returned": struct.unpack("<i", response[16:20])[0],
132+
"data": bson.decode_all(response[20:], codec_options)}
133+
134134
assert len(result["data"]) == result["number_returned"]
135135
return result
136136

@@ -188,6 +188,8 @@ def _check_command_response(response, msg=None, allowable_errors=None):
188188
raise DuplicateKeyError(errmsg, code, response)
189189
elif code == 50:
190190
raise ExecutionTimeout(errmsg, code, response)
191+
elif code == 43:
192+
raise CursorNotFound(errmsg, code, response)
191193

192194
msg = msg or "%s"
193195
raise OperationFailure(msg % errmsg, code, response)
@@ -233,11 +235,11 @@ def _check_gle_response(response):
233235
raise OperationFailure(details["err"], code, result)
234236

235237

236-
def _first_batch(sock_info, namespace, query, ntoreturn,
238+
def _first_batch(sock_info, db, coll, query, ntoreturn,
237239
slave_ok, codec_options, read_preference, cmd, listeners):
238240
"""Simple query helper for retrieving a first (and possibly only) batch."""
239241
query = _Query(
240-
0, namespace, 0, ntoreturn, query, None,
242+
0, db, coll, 0, ntoreturn, query, None,
241243
codec_options, read_preference, 0, ntoreturn)
242244

243245
name = next(iter(cmd))
@@ -252,7 +254,7 @@ def _first_batch(sock_info, namespace, query, ntoreturn,
252254
if publish:
253255
encoding_duration = datetime.datetime.now() - start
254256
listeners.publish_command_start(
255-
cmd, namespace.split('.', 1)[0], request_id, sock_info.address)
257+
cmd, db, request_id, sock_info.address)
256258
start = datetime.datetime.now()
257259

258260
sock_info.send_message(msg, max_doc_size)

pymongo/message.py

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import bson
2828
from bson.codec_options import DEFAULT_CODEC_OPTIONS
29-
from bson.py3compat import b, StringIO
29+
from bson.py3compat import b, StringIO, u
3030
from bson.son import SON
3131
try:
3232
from pymongo import _cmessage
@@ -60,6 +60,8 @@
6060
_DELETE: b'\x04deletes\x00\x00\x00\x00\x00',
6161
}
6262

63+
_UJOIN = u("%s.%s")
64+
6365

6466
def _randint():
6567
"""Generate a pseudo random 32 bit integer."""
@@ -201,14 +203,15 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_time_ms):
201203
class _Query(object):
202204
"""A query operation."""
203205

204-
__slots__ = ('flags', 'ns', 'ntoskip', 'ntoreturn', 'spec', 'fields',
205-
'codec_options', 'read_preference', 'limit', 'batch_size',
206-
'name')
206+
__slots__ = ('flags', 'db', 'coll', 'ntoskip', 'ntoreturn', 'spec',
207+
'fields', 'codec_options', 'read_preference', 'limit',
208+
'batch_size', 'name')
207209

208-
def __init__(self, flags, ns, ntoskip, ntoreturn, spec, fields,
210+
def __init__(self, flags, db, coll, ntoskip, ntoreturn, spec, fields,
209211
codec_options, read_preference, limit, batch_size):
210212
self.flags = flags
211-
self.ns = ns
213+
self.db = db
214+
self.coll = coll
212215
self.ntoskip = ntoskip
213216
self.ntoreturn = ntoreturn
214217
self.spec = spec
@@ -224,51 +227,77 @@ def as_command(self):
224227
225228
Should be called *after* get_message.
226229
"""
227-
dbn, coll = self.ns.split('.', 1)
228230
if '$explain' in self.spec:
229231
self.name = 'explain'
230232
return _gen_explain_command(
231-
coll, self.spec, self.fields, self.ntoskip,
232-
self.limit, self.batch_size, self.flags), dbn
233-
return _gen_find_command(coll, self.spec, self.fields, self.ntoskip,
234-
self.limit, self.batch_size, self.flags), dbn
233+
self.coll, self.spec, self.fields, self.ntoskip,
234+
self.limit, self.batch_size, self.flags), self.db
235+
return _gen_find_command(
236+
self.coll, self.spec, self.fields, self.ntoskip, self.limit,
237+
self.batch_size, self.flags), self.db
235238

236-
def get_message(self, set_slave_ok, is_mongos):
239+
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
237240
"""Get a query message, possibly setting the slaveOk bit."""
238-
if is_mongos:
239-
self.spec = _maybe_add_read_preference(self.spec,
240-
self.read_preference)
241241
if set_slave_ok:
242242
# Set the slaveOk bit.
243243
flags = self.flags | 4
244244
else:
245245
flags = self.flags
246-
return query(flags, self.ns, self.ntoskip, self.ntoreturn,
247-
self.spec, self.fields, self.codec_options)
246+
247+
ns = _UJOIN % (self.db, self.coll)
248+
spec = self.spec
249+
ntoreturn = self.ntoreturn
250+
251+
if use_cmd:
252+
ns = _UJOIN % (self.db, "$cmd")
253+
spec = self.as_command()[0]
254+
ntoreturn = -1 # All DB commands return 1 document
255+
256+
if is_mongos:
257+
spec = _maybe_add_read_preference(spec,
258+
self.read_preference)
259+
260+
return query(flags, ns, self.ntoskip, ntoreturn,
261+
spec, self.fields, self.codec_options)
248262

249263

250264
class _GetMore(object):
251265
"""A getmore operation."""
252266

253-
__slots__ = ('ns', 'ntoreturn', 'cursor_id', 'max_time_ms')
267+
__slots__ = ('db', 'coll', 'ntoreturn', 'cursor_id', 'max_time_ms',
268+
'codec_options', 'cmd_cursor')
254269

255270
name = 'getMore'
256271

257-
def __init__(self, ns, ntoreturn, cursor_id, max_time_ms=None):
258-
self.ns = ns
272+
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
273+
max_time_ms=None, cmd_cursor=False):
274+
self.db = db
275+
self.coll = coll
259276
self.ntoreturn = ntoreturn
260277
self.cursor_id = cursor_id
278+
self.codec_options = codec_options
261279
self.max_time_ms = max_time_ms
280+
# XXX: Temporarily keep track of if this getMore is for a command cursor
281+
# so we can use OP_KILLCURSORS until find support for mongos is completed.
282+
self.cmd_cursor = cmd_cursor
262283

263284
def as_command(self):
264285
"""Return a getMore command document for this query."""
265-
dbn, coll = self.ns.split('.', 1)
266-
return _gen_get_more_command(
267-
self.cursor_id, coll, self.ntoreturn, self.max_time_ms), dbn
286+
return _gen_get_more_command(self.cursor_id, self.coll,
287+
self.ntoreturn, self.max_time_ms), self.db
268288

269-
def get_message(self, dummy0, dummy1):
289+
def get_message(self, dummy0, dummy1, use_cmd=False):
270290
"""Get a getmore message."""
271-
return get_more(self.ns, self.ntoreturn, self.cursor_id)
291+
292+
ns = _UJOIN % (self.db, self.coll)
293+
294+
if use_cmd and not self.cmd_cursor:
295+
ns = _UJOIN % (self.db, "$cmd")
296+
spec = self.as_command()[0]
297+
298+
return query(4, ns, 0, -1, spec, None, self.codec_options)
299+
300+
return get_more(ns, self.ntoreturn, self.cursor_id)
272301

273302

274303
class _CursorAddress(tuple):

pymongo/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ def unlock(self):
11021102
if exc.code != 125:
11031103
raise
11041104
else:
1105-
helpers._first_batch(sock_info, "admin.$cmd.sys.unlock",
1105+
helpers._first_batch(sock_info, "admin", "$cmd.sys.unlock",
11061106
{}, -1, True, self.codec_options,
11071107
ReadPreference.PRIMARY, cmd, self._event_listeners)
11081108

pymongo/response.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,24 @@
1616

1717

1818
class Response(object):
19-
__slots__ = ('_data', '_address', '_request_id', '_duration')
19+
__slots__ = ('_data', '_address', '_request_id', '_duration',
20+
'_from_command')
2021

21-
def __init__(self, data, address, request_id, duration):
22+
def __init__(self, data, address, request_id, duration, from_command):
2223
"""Represent a response from the server.
2324
2425
:Parameters:
2526
- `data`: Raw BSON bytes.
2627
- `address`: (host, port) of the source server.
2728
- `request_id`: The request id of this operation.
2829
- `duration`: The duration of the operation.
30+
- `from_command`: if the response is the result of a db command.
2931
"""
3032
self._data = data
3133
self._address = address
3234
self._request_id = request_id
3335
self._duration = duration
36+
self._from_command = from_command
3437

3538
@property
3639
def data(self):
@@ -52,12 +55,17 @@ def duration(self):
5255
"""The duration of the operation."""
5356
return self._duration
5457

58+
@property
59+
def from_command(self):
60+
"""If the response is a result from a db command."""
61+
return self._from_command
62+
5563

5664
class ExhaustResponse(Response):
5765
__slots__ = ('_socket_info', '_pool')
5866

59-
def __init__(
60-
self, data, address, socket_info, pool, request_id, duration):
67+
def __init__(self, data, address, socket_info, pool, request_id, duration,
68+
from_command):
6169
"""Represent a response to an exhaust cursor's initial query.
6270
6371
:Parameters:
@@ -67,11 +75,13 @@ def __init__(
6775
- `pool`: The Pool from which the SocketInfo came.
6876
- `request_id`: The request id of this operation.
6977
- `duration`: The duration of the operation.
78+
- `from_command`: If the response is the result of a db command.
7079
"""
7180
super(ExhaustResponse, self).__init__(data,
7281
address,
7382
request_id,
74-
duration)
83+
duration,
84+
from_command)
7585
self._socket_info = socket_info
7686
self._pool = pool
7787

0 commit comments

Comments
 (0)