Skip to content

Commit 7425153

Browse files
committed
PYTHON-1665 Agg with $out always goes to primary
1 parent a23ce28 commit 7425153

File tree

5 files changed

+26
-18
lines changed

5 files changed

+26
-18
lines changed

doc/changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include:
5151
:mod:`~pymongo.monitoring` for an example.
5252
- :meth:`pymongo.collection.Collection.aggregate` and
5353
:meth:`pymongo.database.Database.aggregate` now support the ``$merge`` pipeline
54+
stage and use read preference
55+
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` if the ``$out`` or
56+
``$merge`` pipeline stages are used.
5457
- Support for specifying a pipeline or document in
5558
:meth:`~pymongo.collection.Collection.update_one`,
5659
:meth:`~pymongo.collection.Collection.update_many`,

pymongo/aggregation.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from pymongo import common
2020
from pymongo.collation import validate_collation_or_none
2121
from pymongo.errors import ConfigurationError
22+
from pymongo.read_preferences import ReadPreference
2223

2324

2425
class _AggregationCommand(object):
@@ -97,6 +98,11 @@ def _process_result(self, result, session, server, sock_info, slave_ok):
9798
self._result_processor(
9899
result, session, server, sock_info, slave_ok)
99100

101+
def get_read_preference(self, session):
102+
if self._performs_write:
103+
return ReadPreference.PRIMARY
104+
return self._target._read_preference_for(session)
105+
100106
def get_cursor(self, session, server, sock_info, slave_ok):
101107
# Ensure command compatibility.
102108
self._check_compat(sock_info)
@@ -106,9 +112,6 @@ def get_cursor(self, session, server, sock_info, slave_ok):
106112
("pipeline", self._pipeline)])
107113
cmd.update(self._options)
108114

109-
# Cache read preference for easy access.
110-
read_preference = self._target._read_preference_for(session)
111-
112115
# Apply this target's read concern if:
113116
# readConcern has not been specified as a kwarg and either
114117
# - server version is >= 4.2 or
@@ -134,7 +137,7 @@ def get_cursor(self, session, server, sock_info, slave_ok):
134137
self._database.name,
135138
cmd,
136139
slave_ok,
137-
read_preference,
140+
self.get_read_preference(session),
138141
self._target.codec_options,
139142
parse_write_concern_error=True,
140143
read_concern=read_concern,

pymongo/collection.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2287,7 +2287,7 @@ def _aggregate(self, aggregation_command, pipeline, cursor_class, session,
22872287
self, cursor_class, pipeline, kwargs, explicit_session,
22882288
user_fields={'cursor': {'firstBatch': 1}}, use_cursor=use_cursor)
22892289
return self.__database.client._retryable_read(
2290-
cmd.get_cursor, self._read_preference_for(session), session,
2290+
cmd.get_cursor, cmd.get_read_preference(session), session,
22912291
retryable=not cmd._performs_write)
22922292

22932293
def aggregate(self, pipeline, session=None, **kwargs):
@@ -2313,11 +2313,9 @@ def aggregate(self, pipeline, session=None, **kwargs):
23132313
- `useCursor` (bool): Deprecated. Will be removed in PyMongo 4.0.
23142314
23152315
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
2316-
:class:`Collection`. Please note that using the ``$out`` and ``$merge``
2317-
pipeline stages requires a read preference of
2318-
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default).
2319-
The server will raise an error if the ``$out`` or ``$merge`` pipeline
2320-
stages are used with any other read preference.
2316+
:class:`Collection`, except when ``$out`` or ``$merge`` are used, in
2317+
which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`
2318+
is used.
23212319
23222320
.. note:: This method does not support the 'explain' option. Please
23232321
use :meth:`~pymongo.database.Database.command` instead. An
@@ -2337,11 +2335,12 @@ def aggregate(self, pipeline, session=None, **kwargs):
23372335
A :class:`~pymongo.command_cursor.CommandCursor` over the result
23382336
set.
23392337
2340-
.. versionchanged:: 3.9
2341-
Added support for the ``$merge`` pipeline stage.
23422338
.. versionchanged:: 3.9
23432339
Apply this collection's read concern to pipelines containing the
23442340
`$out` stage when connected to MongoDB >= 4.2.
2341+
Added support for the ``$merge`` pipeline stage.
2342+
Aggregations that write always use read preference
2343+
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
23452344
.. versionchanged:: 3.6
23462345
Added the `session` parameter. Added the `maxAwaitTimeMS` option.
23472346
Deprecated the `useCursor` option.

pymongo/database.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -477,11 +477,9 @@ def aggregate(self, pipeline, session=None, **kwargs):
477477
:class:`~pymongo.collation.Collation`.
478478
479479
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
480-
:class:`Database`. Please note that using the ``$out`` or ``$merge``
481-
pipeline stages requires a read preference of
482-
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default).
483-
The server will raise an error if the ``$out`` or ``$merge`` pipeline
484-
stages is used with any other read preference.
480+
:class:`Database`, except when ``$out`` or ``$merge`` are used, in
481+
which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`
482+
is used.
485483
486484
.. note:: This method does not support the 'explain' option. Please
487485
use :meth:`~pymongo.database.Database.command` instead.
@@ -512,7 +510,7 @@ def aggregate(self, pipeline, session=None, **kwargs):
512510
self, CommandCursor, pipeline, kwargs, session is not None,
513511
user_fields={'cursor': {'firstBatch': 1}})
514512
return self.client._retryable_read(
515-
cmd.get_cursor, self._read_preference_for(s), s,
513+
cmd.get_cursor, cmd.get_read_preference(s), s,
516514
retryable=not cmd._performs_write)
517515

518516
def watch(self, pipeline=None, full_document='default', resume_after=None,

test/test_read_preferences.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,11 @@ def test_aggregate(self):
468468
'aggregate',
469469
[{'$project': {'_id': 1}}])
470470

471+
def test_aggregate_write(self):
472+
self._test_coll_helper(False, self.c.pymongo_test.test,
473+
'aggregate',
474+
[{'$project': {'_id': 1}}, {'$out': "agg_write_test"}])
475+
471476

472477
class TestMovingAverage(unittest.TestCase):
473478
def test_moving_average(self):

0 commit comments

Comments
 (0)