Skip to content

Commit 6571f8b

Browse files
committed
PYTHON-207 Update M/R API for server 1.8.0
The map_reduce method now requires an 'out' parameter. Clients that were already passing out="<collection name>" will continue to function correctly. Also added merge_output and reduce_output options which will only work correctly with server >= 1.7.4. This change also adds an inline_map_reduce method that also only works with server >= 1.7.4.
1 parent 8152c88 commit 6571f8b

File tree

2 files changed

+100
-4
lines changed

2 files changed

+100
-4
lines changed

pymongo/collection.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from pymongo import (helpers,
2222
message)
2323
from pymongo.cursor import Cursor
24-
from pymongo.errors import InvalidName
24+
from pymongo.errors import InvalidName, InvalidOperation
2525

2626
_ZERO = "\x00\x00\x00\x00"
2727

@@ -914,7 +914,8 @@ def distinct(self, key):
914914
"""
915915
return self.find().distinct(key)
916916

917-
def map_reduce(self, map, reduce, full_response=False, **kwargs):
917+
def map_reduce(self, map, reduce, out, merge_output=False,
918+
reduce_output=False, full_response=False, **kwargs):
918919
"""Perform a map/reduce operation on this collection.
919920
920921
If `full_response` is ``False`` (default) returns a
@@ -925,6 +926,15 @@ def map_reduce(self, map, reduce, full_response=False, **kwargs):
925926
:Parameters:
926927
- `map`: map function (as a JavaScript string)
927928
- `reduce`: reduce function (as a JavaScript string)
929+
- `out` (required): output collection name
930+
- `merge_output` (optional): Merge output into `out`. If the same
931+
key exists in both the result set and the existing output collection,
932+
the new key will overwrite the existing key
933+
- `reduce_output` (optional): If documents exist for a given key
934+
in the result set and in the existing output collection, then a
935+
reduce operation (using the specified reduce function) will be
936+
performed on the two values and the result will be written to
937+
the output collection
928938
- `full_response` (optional): if ``True``, return full response to
929939
this command - otherwise just return the result collection
930940
- `**kwargs` (optional): additional arguments to the
@@ -943,11 +953,62 @@ def map_reduce(self, map, reduce, full_response=False, **kwargs):
943953
944954
.. mongodoc:: mapreduce
945955
"""
956+
if not isinstance(out, basestring):
957+
raise TypeError("'out' must be an instance of basestring")
958+
959+
if merge_output and reduce_output:
960+
raise InvalidOperation("Can't do both merge and re-reduce of output.")
961+
962+
if merge_output:
963+
out_conf = {"merge": out}
964+
elif reduce_output:
965+
out_conf = {"reduce": out}
966+
else:
967+
out_conf = out
968+
969+
response = self.__database.command("mapreduce", self.__name,
970+
map=map, reduce=reduce,
971+
out=out_conf, **kwargs)
972+
if full_response:
973+
return response
974+
else:
975+
return self.__database[response["result"]]
976+
977+
def inline_map_reduce(self, map, reduce, full_response=False, **kwargs):
978+
"""Perform an inline map/reduce operation on this collection.
979+
980+
Perform the map/reduce operation on the server in RAM. A result
981+
collection is not created. The result set is returned as a list
982+
of documents.
983+
984+
If `full_response` is ``False`` (default) returns the
985+
result documents in a list. Otherwise, returns the full
986+
response from the server to the `map reduce command`_.
987+
988+
:Parameters:
989+
- `map`: map function (as a JavaScript string)
990+
- `reduce`: reduce function (as a JavaScript string)
991+
- `full_response` (optional): if ``True``, return full response to
992+
this command - otherwise just return the result collection
993+
- `**kwargs` (optional): additional arguments to the
994+
`map reduce command`_ may be passed as keyword arguments to this
995+
helper method, e.g.::
996+
997+
>>> db.test.map_reduce(map, reduce, limit=2)
998+
999+
.. note:: Requires server version **>= 1.7.4**
1000+
1001+
.. versionadded:: 1.10
1002+
"""
1003+
9461004
response = self.__database.command("mapreduce", self.__name,
947-
map=map, reduce=reduce, **kwargs)
1005+
map=map, reduce=reduce,
1006+
out={"inline": 1}, **kwargs)
1007+
9481008
if full_response:
9491009
return response
950-
return self.__database[response["result"]]
1010+
else:
1011+
return response.get("results")
9511012

9521013
def find_and_modify(self, query={}, update=None, upsert=False, **kwargs):
9531014
"""Update and return an object.

test/test_collection.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,31 @@ def test_map_reduce(self):
10341034
self.assertEqual(2, result.find_one({"_id": "dog"})["value"])
10351035
self.assertEqual(1, result.find_one({"_id": "mouse"})["value"])
10361036

1037+
if version.at_least(self.db.connection, (1, 7, 4)):
1038+
db.test.insert({"id": 5, "tags": ["hampster"]})
1039+
result = db.test.map_reduce(map, reduce, out='mrunittests')
1040+
self.assertEqual(1, result.find_one({"_id": "hampster"})["value"])
1041+
db.test.remove({"id": 5})
1042+
result = db.test.map_reduce(map, reduce,
1043+
out='mrunittests', merge_output=True)
1044+
self.assertEqual(3, result.find_one({"_id": "cat"})["value"])
1045+
self.assertEqual(1, result.find_one({"_id": "hampster"})["value"])
1046+
1047+
result = db.test.map_reduce(map, reduce,
1048+
out='mrunittests', reduce_output=True)
1049+
self.assertEqual(6, result.find_one({"_id": "cat"})["value"])
1050+
self.assertEqual(4, result.find_one({"_id": "dog"})["value"])
1051+
self.assertEqual(2, result.find_one({"_id": "mouse"})["value"])
1052+
self.assertEqual(1, result.find_one({"_id": "hampster"})["value"])
1053+
1054+
self.assertRaises(InvalidOperation,
1055+
db.test.map_reduce,
1056+
map,
1057+
reduce,
1058+
out='mrunittests',
1059+
merge_output=True,
1060+
reduce_output=True)
1061+
10371062
full_result = db.test.map_reduce(map, reduce,
10381063
out='mrunittests', full_response=True)
10391064
self.assertEqual(6, full_result["counts"]["emit"])
@@ -1043,6 +1068,16 @@ def test_map_reduce(self):
10431068
self.assertEqual(1, result.find_one({"_id": "dog"})["value"])
10441069
self.assertEqual(None, result.find_one({"_id": "mouse"}))
10451070

1071+
if version.at_least(self.db.connection, (1, 7, 4)):
1072+
result = db.test.inline_map_reduce(map, reduce)
1073+
self.assertTrue(isinstance(result, list))
1074+
self.assertEqual(3, len(result))
1075+
self.assertTrue(result[1]["_id"] in ("cat", "dog", "mouse"))
1076+
1077+
full_result = db.test.inline_map_reduce(map, reduce,
1078+
full_response=True)
1079+
self.assertEqual(6, full_result["counts"]["emit"])
1080+
10461081
def test_messages_with_unicode_collection_names(self):
10471082
db = self.db
10481083

0 commit comments

Comments
 (0)