Skip to content

Commit 393ed35

Browse files
Luke Lovettbehackett
authored andcommitted
PYTHON-981 - Apply collection-level read concern to *_map_reduce and aggregate when appropriate.
1 parent f5b44ea commit 393ed35

File tree

2 files changed

+77
-5
lines changed

2 files changed

+77
-5
lines changed

pymongo/collection.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,8 +1642,18 @@ def aggregate(self, pipeline, **kwargs):
16421642

16431643
cmd.update(kwargs)
16441644

1645-
result = self._command(sock_info, cmd, slave_ok,
1646-
read_concern=self.read_concern)
1645+
# Apply this Collection's read concern if $out is not in the
1646+
# pipeline.
1647+
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
1648+
for stage in cmd['pipeline']:
1649+
if '$out' in stage:
1650+
result = self._command(sock_info, cmd, slave_ok)
1651+
break
1652+
else:
1653+
result = self._command(sock_info, cmd, slave_ok,
1654+
read_concern=self.read_concern)
1655+
else:
1656+
result = self._command(sock_info, cmd, slave_ok)
16471657

16481658
if "cursor" in result:
16491659
cursor = result["cursor"]
@@ -1826,8 +1836,14 @@ def map_reduce(self, map, reduce, out, full_response=False, **kwargs):
18261836
cmd.update(kwargs)
18271837

18281838
with self._socket_for_primary_reads() as (sock_info, slave_ok):
1829-
response = self._command(
1830-
sock_info, cmd, slave_ok, ReadPreference.PRIMARY)
1839+
if (sock_info.max_wire_version >= 4 and 'readConcern' not in cmd and
1840+
'inline' in cmd['out']):
1841+
response = self._command(
1842+
sock_info, cmd, slave_ok, ReadPreference.PRIMARY,
1843+
read_concern=self.read_concern)
1844+
else:
1845+
response = self._command(
1846+
sock_info, cmd, slave_ok, ReadPreference.PRIMARY)
18311847

18321848
if full_response or not response.get('result'):
18331849
return response
@@ -1869,6 +1885,11 @@ def inline_map_reduce(self, map, reduce, full_response=False, **kwargs):
18691885
("out", {"inline": 1})])
18701886
cmd.update(kwargs)
18711887
with self._socket_for_reads() as (sock_info, slave_ok):
1888+
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
1889+
res = self._command(sock_info, cmd, slave_ok,
1890+
read_concern=self.read_concern)
1891+
else:
1892+
res = self._command(sock_info, cmd, slave_ok)
18721893
res = self._command(sock_info, cmd, slave_ok)
18731894

18741895
if full_response:

test/test_read_concern.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from bson.son import SON
2020
from pymongo import monitoring
21-
from pymongo.errors import ConfigurationError
21+
from pymongo.errors import ConfigurationError, OperationFailure
2222
from pymongo.read_concern import ReadConcern
2323

2424
from test import client_context, pair, unittest
@@ -110,3 +110,54 @@ def test_command_cursor(self):
110110
self.assertEqual(
111111
{'level': 'local'},
112112
self.listener.results['started'][0].command['readConcern'])
113+
114+
def test_aggregate_out(self):
115+
coll = self.db.get_collection('coll', read_concern=ReadConcern('local'))
116+
try:
117+
tuple(coll.aggregate([{'$match': {'field': 'value'}},
118+
{'$out': 'output_collection'}]))
119+
except OperationFailure:
120+
# "ns doesn't exist"
121+
pass
122+
self.assertNotIn('readConcern',
123+
self.listener.results['started'][0].command)
124+
125+
def test_map_reduce_out(self):
126+
coll = self.db.get_collection('coll', read_concern=ReadConcern('local'))
127+
try:
128+
tuple(coll.map_reduce('function() { emit(this._id, this.value); }',
129+
'function(key, values) { return 42; }',
130+
out='output_collection'))
131+
except OperationFailure:
132+
# "ns doesn't exist"
133+
pass
134+
self.assertNotIn('readConcern',
135+
self.listener.results['started'][0].command)
136+
137+
if client_context.version.at_least(3, 1, 9, -1):
138+
self.listener.results.clear()
139+
try:
140+
tuple(coll.map_reduce(
141+
'function() { emit(this._id, this.value); }',
142+
'function(key, values) { return 42; }',
143+
out={'inline': 1}))
144+
except OperationFailure:
145+
# "ns doesn't exist"
146+
pass
147+
self.assertEqual(
148+
{'level': 'local'},
149+
self.listener.results['started'][0].command['readConcern'])
150+
151+
@client_context.require_version_min(3, 1, 9, -1)
152+
def test_inline_map_reduce(self):
153+
coll = self.db.get_collection('coll', read_concern=ReadConcern('local'))
154+
try:
155+
tuple(coll.inline_map_reduce(
156+
'function() { emit(this._id, this.value); }',
157+
'function(key, values) { return 42; }'))
158+
except OperationFailure:
159+
# "ns doesn't exist"
160+
pass
161+
self.assertEqual(
162+
{'level': 'local'},
163+
self.listener.results['started'][0].command['readConcern'])

0 commit comments

Comments
 (0)