Skip to content

Commit 3a6d0ae

Browse files
author
Mike Dirolf
committed
add tailable option to find() to use a tailable cursor
1 parent f181a1a commit 3a6d0ae

File tree

3 files changed

+56
-9
lines changed

3 files changed

+56
-9
lines changed

pymongo/collection.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ def _fields_list_to_dict(self, fields):
305305
return as_dict
306306

307307
def find(self, spec=None, fields=None, skip=0, limit=0,
308-
slave_okay=None, timeout=True, snapshot=False,
308+
slave_okay=None, timeout=True, snapshot=False, tailable=False,
309309
_sock=None, _must_use_master=False):
310310
"""Query the database.
311311
@@ -342,9 +342,15 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
342342
- `snapshot` (optional): if True, snapshot mode will be used for this
343343
query. Snapshot mode assures no duplicates are returned, or objects
344344
missed, which were present at both the start and end of the query's
345-
execution. For details, see the wiki_
346-
347-
.. _wiki: http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database
345+
execution. For details, see the `wiki
346+
<http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database>`_.
347+
- `tailable` (optional): the result of this find call will be a
348+
tailable cursor - tailable cursors aren't closed when the last data
349+
is retrieved but are kept open and the cursors location marks the
350+
final document's position. if more data is received iteration of
351+
the cursor will continue from the last document received. For
352+
details, see the `wiki
353+
<http://www.mongodb.org/display/DOCS/Tailable+Cursors>`_.
348354
"""
349355
if spec is None:
350356
spec = SON()
@@ -370,14 +376,17 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
370376
raise TypeError("timeout must be an instance of bool")
371377
if not isinstance(snapshot, types.BooleanType):
372378
raise TypeError("snapshot must be an instance of bool")
379+
if not isinstance(tailable, types.BooleanType):
380+
raise TypeError("tailable must be an instance of bool")
373381

374382
if fields is not None:
375383
if not fields:
376384
fields = ["_id"]
377385
fields = self._fields_list_to_dict(fields)
378386

379-
return Cursor(self, spec, fields, skip, limit, slave_okay, timeout, snapshot,
380-
_sock=_sock, _must_use_master=_must_use_master)
387+
return Cursor(self, spec, fields, skip, limit, slave_okay, timeout,
388+
tailable, snapshot, _sock=_sock,
389+
_must_use_master=_must_use_master)
381390

382391
def count(self):
383392
"""Get the number of documents in this collection.

pymongo/cursor.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class Cursor(object):
3737
"""
3838

3939
def __init__(self, collection, spec, fields, skip, limit, slave_okay,
40-
timeout, snapshot=False, _sock=None, _must_use_master=False):
40+
timeout, tailable, snapshot=False,
41+
_sock=None, _must_use_master=False):
4142
"""Create a new cursor.
4243
4344
Should not be called directly by application developers.
@@ -49,6 +50,7 @@ def __init__(self, collection, spec, fields, skip, limit, slave_okay,
4950
self.__limit = limit
5051
self.__slave_okay = slave_okay
5152
self.__timeout = timeout
53+
self.__tailable = tailable
5254
self.__snapshot = snapshot
5355
self.__ordering = None
5456
self.__explain = False
@@ -99,7 +101,7 @@ def clone(self):
99101
"""
100102
copy = Cursor(self.__collection, self.__spec, self.__fields,
101103
self.__skip, self.__limit, self.__slave_okay,
102-
self.__timeout, self.__snapshot)
104+
self.__timeout, self.__tailable, self.__snapshot)
103105
copy.__ordering = self.__ordering
104106
copy.__explain = self.__explain
105107
copy.__hint = self.__hint
@@ -141,6 +143,8 @@ def __query_options(self):
141143
"""Get the 4 byte query options string to use for this query.
142144
"""
143145
options = 0
146+
if self.__tailable:
147+
options |= _QUERY_OPTIONS["tailable_cursor"]
144148
if self.__slave_okay:
145149
options |= _QUERY_OPTIONS["slave_okay"]
146150
if not self.__timeout:
@@ -381,7 +385,11 @@ def __send_message(self, operation, message):
381385

382386
response = pymongo.Connection._unpack_response(response, self.__id)
383387
self.__id = response["cursor_id"]
384-
assert response["starting_from"] == self.__retrieved
388+
389+
# starting from doesn't get set on getmore's for tailable cursors
390+
if not self.__tailable:
391+
assert response["starting_from"] == self.__retrieved
392+
385393
self.__retrieved += response["number_returned"]
386394
self.__data = response["data"]
387395

test/test_cursor.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,36 @@ def set_coll():
555555

556556
self.assertRaises(AttributeError, set_coll)
557557

558+
def test_tailable(self):
559+
db = self.db
560+
db.drop_collection("test")
561+
562+
563+
cursor = db.test.find(tailable=True)
564+
565+
db.test.insert({"x": 1})
566+
count = 0
567+
for doc in cursor:
568+
count += 1
569+
self.assertEqual(1, doc["x"])
570+
self.assertEqual(1, count)
571+
572+
db.test.insert({"x": 2})
573+
count = 0
574+
for doc in cursor:
575+
count += 1
576+
self.assertEqual(2, doc["x"])
577+
self.assertEqual(1, count)
578+
579+
db.test.insert({"x": 3})
580+
count = 0
581+
for doc in cursor:
582+
count += 1
583+
self.assertEqual(3, doc["x"])
584+
self.assertEqual(1, count)
585+
586+
self.assertEqual(3, db.test.count())
587+
558588

559589
if __name__ == "__main__":
560590
unittest.main()

0 commit comments

Comments
 (0)