Skip to content

Commit 98e77ba

Browse files
committed
PYTHON-1483 Prohibit unack'ed writes with explicit sessions
1 parent 8573099 commit 98e77ba

File tree

8 files changed

+179
-79
lines changed

8 files changed

+179
-79
lines changed

doc/changelog.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ Unavoidable breaking changes:
8888
:class:`~pymongo.errors.OperationFailure`.
8989
- :meth:`~pymongo.collection.Collection.parallel_scan` no longer uses an
9090
implicit session. Explicit sessions are still supported.
91+
- Unacknowledged writes (``w=0``) with an explicit ``session`` parameter now
92+
raise a client side error. Since PyMongo does not wait for a response for an
93+
unacknowledged write, two unacknowledged writes run serially by the client
94+
may be executed simultaneously on the server. However, the server requires a
95+
single session must not be used simultaneously by more than one operation.
96+
Therefore explicit sessions cannot support unacknowledged writes.
97+
Unacknowledged writes without a ``session`` parameter are still supported.
9198

9299

93100
Issues Resolved

pymongo/bulk.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from bson.objectid import ObjectId
2424
from bson.raw_bson import RawBSONDocument
2525
from bson.son import SON
26+
from pymongo.client_session import _validate_session_write_concern
2627
from pymongo.common import (validate_is_mapping,
2728
validate_is_document_type,
2829
validate_ok_for_replace,
@@ -498,6 +499,7 @@ def execute(self, write_concern, session):
498499
self.executed = True
499500
write_concern = (WriteConcern(**write_concern) if
500501
write_concern else self.collection.write_concern)
502+
session = _validate_session_write_concern(session, write_concern)
501503

502504
if self.ordered:
503505
generator = self.gen_ordered()

pymongo/client_session.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,28 @@ def read_preference(self):
192192
return self._read_preference
193193

194194

195+
def _validate_session_write_concern(session, write_concern):
196+
"""Validate that an explicit session is not used with an unack'ed write.
197+
198+
Returns the session to use for the next operation.
199+
"""
200+
if session:
201+
if write_concern is not None and not write_concern.acknowledged:
202+
# For unacknowledged writes without an explicit session,
203+
# drivers SHOULD NOT use an implicit session. If a driver
204+
# creates an implicit session for unacknowledged writes
205+
# without an explicit session, the driver MUST NOT send the
206+
# session ID.
207+
if session._implicit:
208+
return None
209+
else:
210+
raise ConfigurationError(
211+
'Explicit sessions are incompatible with '
212+
'unacknowledged write concern: %r' % (
213+
write_concern,))
214+
return session
215+
216+
195217
class _TransactionContext(object):
196218
"""Internal transaction context manager for start_transaction."""
197219
def __init__(self, session):
@@ -243,14 +265,16 @@ def _reraise_with_unknown_commit(exc):
243265

244266
class ClientSession(object):
245267
"""A session for ordering sequential operations."""
246-
def __init__(self, client, server_session, options, authset):
268+
def __init__(self, client, server_session, options, authset, implicit):
247269
# A MongoClient, a _ServerSession, a SessionOptions, and a set.
248270
self._client = client
249271
self._server_session = server_session
250272
self._options = options
251273
self._authset = authset
252274
self._cluster_time = None
253275
self._operation_time = None
276+
# Is this an implicitly created session?
277+
self._implicit = implicit
254278
self._transaction = _Transaction(None)
255279

256280
def end_session(self):

pymongo/collection.py

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2258,10 +2258,6 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
22582258
if first_batch_size is not None and not dollar_out:
22592259
kwargs["cursor"]["batchSize"] = first_batch_size
22602260

2261-
if (sock_info.max_wire_version >= 5 and dollar_out and
2262-
not self.write_concern.is_server_default):
2263-
cmd['writeConcern'] = self.write_concern.document
2264-
22652261
cmd.update(kwargs)
22662262
# Apply this Collection's read concern if $out is not in the
22672263
# pipeline.
@@ -2271,6 +2267,10 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
22712267
read_concern = self.read_concern
22722268
else:
22732269
read_concern = None
2270+
if 'writeConcern' not in cmd and dollar_out:
2271+
write_concern = self.write_concern
2272+
else:
2273+
write_concern = None
22742274

22752275
# Avoid auto-injecting a session: aggregate() passes a session,
22762276
# aggregate_raw_batches() passes none.
@@ -2282,17 +2282,18 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
22822282
self.codec_options,
22832283
parse_write_concern_error=True,
22842284
read_concern=read_concern,
2285+
write_concern=write_concern,
22852286
collation=collation,
22862287
session=session,
22872288
client=self.__database.client)
22882289

22892290
if "cursor" in result:
22902291
cursor = result["cursor"]
22912292
else:
2292-
# Pre-MongoDB 2.6. Fake a cursor.
2293+
# Pre-MongoDB 2.6 or unacknowledged write. Fake a cursor.
22932294
cursor = {
22942295
"id": 0,
2295-
"firstBatch": result["result"],
2296+
"firstBatch": result.get("result", []),
22962297
"ns": self.full_name,
22972298
}
22982299

@@ -2586,14 +2587,15 @@ def rename(self, new_name, session=None, **kwargs):
25862587

25872588
new_name = "%s.%s" % (self.__database.name, new_name)
25882589
cmd = SON([("renameCollection", self.__full_name), ("to", new_name)])
2590+
cmd.update(kwargs)
2591+
write_concern = self._write_concern_for_cmd(cmd)
2592+
25892593
with self._socket_for_writes() as sock_info:
25902594
with self.__database.client._tmp_session(session) as s:
2591-
if (sock_info.max_wire_version >= 5 and
2592-
not self.write_concern.is_server_default):
2593-
cmd['writeConcern'] = self.write_concern.document
2594-
cmd.update(kwargs)
25952595
return sock_info.command(
2596-
'admin', cmd, parse_write_concern_error=True,
2596+
'admin', cmd,
2597+
write_concern=write_concern,
2598+
parse_write_concern_error=True,
25972599
session=s, client=self.__database.client)
25982600

25992601
def distinct(self, key, filter=None, session=None, **kwargs):
@@ -2716,20 +2718,20 @@ def map_reduce(self, map, reduce, out, full_response=False, session=None,
27162718
inline = 'inline' in cmd['out']
27172719
sock_ctx, read_pref = self._socket_for_primary_reads(session)
27182720
with sock_ctx as (sock_info, slave_ok):
2719-
if (sock_info.max_wire_version >= 5 and
2720-
not self.write_concern.is_server_default and
2721-
not inline):
2722-
cmd['writeConcern'] = self.write_concern.document
2723-
cmd.update(kwargs)
27242721
if (sock_info.max_wire_version >= 4 and 'readConcern' not in cmd and
27252722
inline):
27262723
read_concern = self.read_concern
27272724
else:
27282725
read_concern = None
2726+
if 'writeConcern' not in cmd and not inline:
2727+
write_concern = self.write_concern
2728+
else:
2729+
write_concern = None
27292730

27302731
response = self._command(
27312732
sock_info, cmd, slave_ok, read_pref,
27322733
read_concern=read_concern,
2734+
write_concern=write_concern,
27332735
collation=collation, session=session)
27342736

27352737
if full_response or not response.get('result'):
@@ -2796,6 +2798,13 @@ def inline_map_reduce(self, map, reduce, full_response=False, session=None,
27962798
else:
27972799
return res.get("results")
27982800

2801+
def _write_concern_for_cmd(self, cmd):
2802+
raw_wc = cmd.get('writeConcern')
2803+
if raw_wc is not None:
2804+
return WriteConcern(**raw_wc)
2805+
else:
2806+
return self.write_concern
2807+
27992808
def __find_and_modify(self, filter, projection, sort, upsert=None,
28002809
return_document=ReturnDocument.BEFORE,
28012810
array_filters=None, session=None, **kwargs):
@@ -2818,11 +2827,7 @@ def __find_and_modify(self, filter, projection, sort, upsert=None,
28182827
common.validate_boolean("upsert", upsert)
28192828
cmd["upsert"] = upsert
28202829

2821-
write_concern = cmd.get('writeConcern')
2822-
if write_concern is not None:
2823-
acknowledged = write_concern.get("w") != 0
2824-
else:
2825-
acknowledged = self.write_concern.acknowledged
2830+
write_concern = self._write_concern_for_cmd(cmd)
28262831

28272832
def _find_and_modify(session, sock_info, retryable_write):
28282833
if array_filters is not None:
@@ -2835,20 +2840,20 @@ def _find_and_modify(session, sock_info, retryable_write):
28352840
'arrayFilters is unsupported for unacknowledged '
28362841
'writes.')
28372842
cmd["arrayFilters"] = array_filters
2838-
if sock_info.max_wire_version >= 4 and 'writeConcern' not in cmd:
2839-
wc_doc = self.write_concern.document
2840-
if wc_doc:
2841-
cmd['writeConcern'] = wc_doc
2843+
if (sock_info.max_wire_version >= 4 and
2844+
not write_concern.is_server_default):
2845+
cmd['writeConcern'] = write_concern.document
28422846
out = self._command(sock_info, cmd,
28432847
read_preference=ReadPreference.PRIMARY,
2848+
write_concern=write_concern,
28442849
allowable_errors=[_NO_OBJ_ERROR],
28452850
collation=collation, session=session,
28462851
retryable_write=retryable_write)
28472852
_check_write_command_response(out)
28482853
return out.get("value")
28492854

28502855
return self.__database.client._retryable_write(
2851-
acknowledged, _find_and_modify, session)
2856+
write_concern.acknowledged, _find_and_modify, session)
28522857

28532858
def find_one_and_delete(self, filter,
28542859
projection=None, sort=None, session=None, **kwargs):
@@ -3245,17 +3250,12 @@ def find_and_modify(self, query={}, update=None,
32453250
cmd = SON([("findAndModify", self.__name)])
32463251
cmd.update(kwargs)
32473252

3248-
write_concern = cmd.get('writeConcern')
3249-
if write_concern is not None:
3250-
acknowledged = write_concern.get("w") != 0
3251-
else:
3252-
acknowledged = self.write_concern.acknowledged
3253+
write_concern = self._write_concern_for_cmd(cmd)
32533254

32543255
def _find_and_modify(session, sock_info, retryable_write):
3255-
if sock_info.max_wire_version >= 4 and 'writeConcern' not in cmd:
3256-
wc_doc = self.write_concern.document
3257-
if wc_doc:
3258-
cmd['writeConcern'] = wc_doc
3256+
if (sock_info.max_wire_version >= 4 and
3257+
not write_concern.is_server_default):
3258+
cmd['writeConcern'] = write_concern.document
32593259
result = self._command(
32603260
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
32613261
allowable_errors=[_NO_OBJ_ERROR], collation=collation,
@@ -3265,7 +3265,7 @@ def _find_and_modify(session, sock_info, retryable_write):
32653265
return result
32663266

32673267
out = self.__database.client._retryable_write(
3268-
acknowledged, _find_and_modify, None)
3268+
write_concern.acknowledged, _find_and_modify, None)
32693269

32703270
if not out['ok']:
32713271
if out["errmsg"] == _NO_OBJ_ERROR:

pymongo/database.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -498,11 +498,6 @@ def _command(self, sock_info, command, slave_ok=False, value=1, check=True,
498498
if isinstance(command, string_type):
499499
command = SON([(command, value)])
500500

501-
if (sock_info.max_wire_version >= 5 and
502-
write_concern and
503-
not write_concern.is_server_default):
504-
command['writeConcern'] = write_concern.document
505-
506501
command.update(kwargs)
507502
with self.__client._tmp_session(session) as s:
508503
return sock_info.command(
@@ -513,6 +508,7 @@ def _command(self, sock_info, command, slave_ok=False, value=1, check=True,
513508
codec_options,
514509
check,
515510
allowable_errors,
511+
write_concern=write_concern,
516512
parse_write_concern_error=parse_write_concern_error,
517513
session=s,
518514
client=self.__client)

pymongo/mongo_client.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,21 @@ def _process_periodic_tasks(self):
14981498
except Exception:
14991499
helpers._handle_exception()
15001500

1501+
def __start_session(self, implicit, **kwargs):
1502+
# Driver Sessions Spec: "If startSession is called when multiple users
1503+
# are authenticated drivers MUST raise an error with the error message
1504+
# 'Cannot call startSession when multiple users are authenticated.'"
1505+
authset = set(self.__all_credentials.values())
1506+
if len(authset) > 1:
1507+
raise InvalidOperation("Cannot call start_session when"
1508+
" multiple users are authenticated")
1509+
1510+
# Raises ConfigurationError if sessions are not supported.
1511+
server_session = self._get_server_session()
1512+
opts = client_session.SessionOptions(**kwargs)
1513+
return client_session.ClientSession(
1514+
self, server_session, opts, authset, implicit)
1515+
15011516
def start_session(self,
15021517
causal_consistency=True,
15031518
default_transaction_options=None):
@@ -1519,21 +1534,10 @@ def start_session(self,
15191534
15201535
.. versionadded:: 3.6
15211536
"""
1522-
# Driver Sessions Spec: "If startSession is called when multiple users
1523-
# are authenticated drivers MUST raise an error with the error message
1524-
# 'Cannot call startSession when multiple users are authenticated.'"
1525-
authset = set(self.__all_credentials.values())
1526-
if len(authset) > 1:
1527-
raise InvalidOperation("Cannot call start_session when"
1528-
" multiple users are authenticated")
1529-
1530-
# Raises ConfigurationError if sessions are not supported.
1531-
server_session = self._get_server_session()
1532-
opts = client_session.SessionOptions(
1537+
return self.__start_session(
1538+
False,
15331539
causal_consistency=causal_consistency,
15341540
default_transaction_options=default_transaction_options)
1535-
return client_session.ClientSession(
1536-
self, server_session, opts, authset)
15371541

15381542
def _get_server_session(self):
15391543
"""Internal: start or resume a _ServerSession."""
@@ -1549,9 +1553,9 @@ def _ensure_session(self, session=None):
15491553
return session
15501554

15511555
try:
1552-
# Don't make implied sessions causally consistent. Applications
1556+
# Don't make implicit sessions causally consistent. Applications
15531557
# should always opt-in.
1554-
return self.start_session(causal_consistency=False)
1558+
return self.__start_session(True, causal_consistency=False)
15551559
except (ConfigurationError, InvalidOperation):
15561560
# Sessions not supported, or multiple users authenticated.
15571561
return None

pymongo/pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class _SSLCertificateError(ValueError):
4040
from bson.py3compat import imap, itervalues, _unicode, integer_types
4141
from bson.son import SON
4242
from pymongo import auth, helpers, thread_util, __version__
43+
from pymongo.client_session import _validate_session_write_concern
4344
from pymongo.common import (MAX_BSON_SIZE,
4445
MAX_MESSAGE_SIZE,
4546
MAX_WIRE_VERSION,
@@ -534,6 +535,7 @@ def command(self, dbname, spec, slave_ok=False,
534535
- `publish_events`: Should we publish events for this command?
535536
"""
536537
self.validate_session(client, session)
538+
session = _validate_session_write_concern(session, write_concern)
537539

538540
# Ensure command name remains in first place.
539541
if not isinstance(spec, ORDERED_TYPES):

0 commit comments

Comments
 (0)