Skip to content

Commit dad61a0

Browse files
committed
PYTHON-1332 - Update session's last_use
1 parent d2f0ade commit dad61a0

File tree

6 files changed

+32
-11
lines changed

6 files changed

+32
-11
lines changed

pymongo/bulk.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,9 @@ def execute_command(self, sock_info, generator, write_concern, session):
320320
if self.bypass_doc_val and sock_info.max_wire_version >= 4:
321321
cmd['bypassDocumentValidation'] = True
322322
if s:
323-
cmd['lsid'] = s.session_id
323+
cmd['lsid'] = s._use_lsid()
324324
bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id,
325-
listeners)
325+
listeners, s)
326326

327327
results = _do_batched_write_command(
328328
self.namespace, run.op_type, cmd,

pymongo/client_session.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ def has_ended(self):
120120
"""True if this session is finished."""
121121
return self._server_session is None
122122

123+
def _use_lsid(self):
124+
# Internal function.
125+
if self._server_session is None:
126+
raise InvalidOperation("Cannot use ended session")
127+
128+
return self._server_session.use_lsid()
129+
123130

124131
class _ServerSession(object):
125132
def __init__(self):
@@ -133,6 +140,10 @@ def timed_out(self, session_timeout_minutes):
133140
# Timed out if we have less than a minute to live.
134141
return idle_seconds > (session_timeout_minutes - 1) * 60
135142

143+
def use_lsid(self):
144+
self.last_use = monotonic.time()
145+
return self.session_id
146+
136147

137148
class _ServerSessionPool(collections.deque):
138149
"""Pool of _ServerSession objects.

pymongo/collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,12 +635,12 @@ def gen():
635635
command['bypassDocumentValidation'] = True
636636
bwc = message._BulkWriteContext(
637637
self.database.name, command, sock_info, op_id,
638-
self.database.client._event_listeners)
638+
self.database.client._event_listeners, session=None)
639639
if acknowledged:
640640
# Batched insert command.
641641
with self.__database.client._tmp_session(session) as s:
642642
if s:
643-
command['lsid'] = s.session_id
643+
command['lsid'] = s._use_lsid()
644644
results = message._do_batched_write_command(
645645
self.database.name + ".$cmd", message._INSERT, command,
646646
gen(), check_keys, self.__write_response_codec_options, bwc)

pymongo/message.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def _gen_explain_command(
174174
explain = SON([('explain', cmd)])
175175

176176
if session:
177-
explain['lsid'] = session.session_id
177+
explain['lsid'] = session._use_lsid()
178178

179179
return explain
180180

@@ -213,7 +213,7 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
213213
for opt, val in _OPTIONS.items()
214214
if options & val])
215215
if session:
216-
cmd['lsid'] = session.session_id
216+
cmd['lsid'] = session._use_lsid()
217217
return cmd
218218

219219

@@ -227,7 +227,7 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms,
227227
if max_await_time_ms is not None:
228228
cmd['maxTimeMS'] = max_await_time_ms
229229
if session:
230-
cmd['lsid'] = session.session_id
230+
cmd['lsid'] = session._use_lsid()
231231
return cmd
232232

233233

@@ -565,10 +565,11 @@ class _BulkWriteContext(object):
565565
"""A wrapper around SocketInfo for use with write splitting functions."""
566566

567567
__slots__ = ('db_name', 'command', 'sock_info', 'op_id',
568-
'name', 'field', 'publish', 'start_time', 'listeners')
568+
'name', 'field', 'publish', 'start_time', 'listeners',
569+
'session')
569570

570-
def __init__(
571-
self, database_name, command, sock_info, operation_id, listeners):
571+
def __init__(self, database_name, command, sock_info, operation_id,
572+
listeners, session):
572573
self.db_name = database_name
573574
self.command = command
574575
self.sock_info = sock_info
@@ -578,6 +579,7 @@ def __init__(
578579
self.name = next(iter(command))
579580
self.field = _FIELD_MAP[self.name]
580581
self.start_time = datetime.datetime.now() if self.publish else None
582+
self.session = session
581583

582584
@property
583585
def max_bson_size(self):
@@ -628,6 +630,9 @@ def legacy_write(self, request_id, msg, max_doc_size, acknowledged, docs):
628630
def write_command(self, request_id, msg, docs):
629631
"""A proxy for SocketInfo.write_command that handles event publishing.
630632
"""
633+
if self.session:
634+
# Update last_use time.
635+
self.session._use_lsid()
631636
if self.publish:
632637
duration = datetime.datetime.now() - self.start_time
633638
self._start(request_id, docs)

pymongo/network.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
8282
if spec.__class__ is dict:
8383
# Ensure command name remains in first place.
8484
spec = SON(spec)
85-
spec['lsid'] = session.session_id
85+
spec['lsid'] = session._use_lsid()
8686

8787
# Publish the original command document, perhaps with session id.
8888
orig = spec

test/test_session.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from pymongo.errors import (ConfigurationError,
2525
InvalidOperation,
2626
OperationFailure)
27+
from pymongo.monotonic import time as _time
2728
from test import IntegrationTest, client_context, db_user, db_pwd
2829
from test.utils import ignore_deprecations, rs_or_single_client, EventListener
2930

@@ -74,12 +75,16 @@ def _test_ops(self, client, *ops, **kwargs):
7475

7576
for f, args, kw in ops:
7677
with client.start_session() as s:
78+
last_use = s._server_session.last_use
79+
start = _time()
80+
self.assertLessEqual(last_use, start)
7781
listener.results.clear()
7882
# In case "f" modifies its inputs.
7983
args = copy.copy(args)
8084
kw = copy.copy(kw)
8185
kw['session'] = s
8286
f(*args, **kw)
87+
self.assertGreaterEqual(s._server_session.last_use, start)
8388
self.assertGreaterEqual(len(listener.results['started']), 1)
8489
for event in listener.results['started']:
8590
self.assertTrue(

0 commit comments

Comments
 (0)