Skip to content

Commit 64f8ea1

Browse files
authored
fix: applies the global timeout value to each query (#215)
1 parent f68b9c1 commit 64f8ea1

File tree

4 files changed

+47
-28
lines changed

4 files changed

+47
-28
lines changed

lib/mongo.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1502,7 +1502,7 @@ defmodule Mongo do
15021502
@spec exec_command_session(GenServer.server(), BSON.document(), Keyword.t()) ::
15031503
{:ok, BSON.document() | nil} | {:error, Mongo.Error.t()}
15041504
def exec_command_session(session, cmd, opts) do
1505-
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
1505+
with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
15061506
{:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
15071507
:ok <- Session.update_session(session, response, opts),
15081508
{:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do

lib/mongo/monitor.ex

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,18 +186,8 @@ defmodule Mongo.Monitor do
186186
##
187187
# Get a new server description from the server and send it to the Topology process.
188188
##
189-
defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do
190-
case get_server_description(state) do
191-
%{round_trip_time: round_trip_time} ->
192-
## debug info("Updating round_trip_time: #{inspect round_trip_time}")
193-
Topology.update_rrt(topology_pid, address, round_trip_time)
194-
195-
%{state | round_trip_time: round_trip_time}
196-
197-
error ->
198-
warning("Unable to round trip time because of #{inspect(error)}")
199-
state
200-
end
189+
defp update_server_description(%{mode: :streaming_mode} = state) do
190+
state
201191
end
202192

203193
##

lib/mongo/session.ex

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,15 @@ defmodule Mongo.Session do
186186

187187
@doc """
188188
Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically.
189+
The global session timeout is merged to the options as well.
189190
"""
190-
@spec bind_session(Session.t(), BSON.document()) :: {:ok, pid, BSON.document()} | {:error, term()}
191-
def bind_session(nil, _cmd) do
191+
@spec bind_session(Session.t(), BSON.document(), Keyword.t()) :: {:ok, pid, BSON.document(), Keyword.t()} | {:error, term()}
192+
def bind_session(nil, _cmd, _opts) do
192193
{:error, Mongo.Error.exception("No session")}
193194
end
194195

195-
def bind_session(pid, cmd) do
196-
call(pid, {:bind_session, cmd})
196+
def bind_session(pid, cmd, opts) do
197+
call(pid, {:bind_session, cmd, opts})
197198
end
198199

199200
@doc """
@@ -462,13 +463,16 @@ defmodule Mongo.Session do
462463
##
463464
# bind session: only if wire_version >= 6, MongoDB 3.6.x and no transaction is running: only lsid and the transaction-id is added
464465
#
465-
def handle_call_event({:bind_session, cmd}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data)
466+
def handle_call_event({:bind_session, cmd, client_opts}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data)
466467
when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
467468
## only if retryable_writes are enabled!
468469
options =
469470
case opts[:retryable_writes] do
470-
true -> [lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
471-
_ -> [lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
471+
true ->
472+
[lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
473+
474+
_ ->
475+
[lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
472476
end
473477

474478
cmd =
@@ -477,11 +481,12 @@ defmodule Mongo.Session do
477481
|> ReadPreference.add_read_preference(opts)
478482
|> filter_nils()
479483

480-
{:keep_state_and_data, {:ok, conn, cmd}}
484+
client_opts = merge_timeout(client_opts, opts)
485+
{:keep_state_and_data, {:ok, conn, cmd, client_opts}}
481486
end
482487

483-
def handle_call_event({:bind_session, cmd}, :starting_transaction, %Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do
484-
result =
488+
def handle_call_event({:bind_session, cmd, client_opts}, :starting_transaction, %Session{conn: conn, opts: opts, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do
489+
cmd =
485490
Keyword.merge(cmd,
486491
readConcern: read_concern(data, Keyword.get(cmd, :readConcern)),
487492
lsid: %{id: id},
@@ -492,10 +497,11 @@ defmodule Mongo.Session do
492497
|> filter_nils()
493498
|> Keyword.drop(~w(writeConcern)a)
494499

495-
{:next_state, :transaction_in_progress, {:ok, conn, result}}
500+
client_opts = merge_timeout(client_opts, opts)
501+
{:next_state, :transaction_in_progress, {:ok, conn, cmd, client_opts}}
496502
end
497503

498-
def handle_call_event({:bind_session, cmd}, :transaction_in_progress, %Session{conn: conn, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
504+
def handle_call_event({:bind_session, cmd, client_opts}, :transaction_in_progress, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
499505
result =
500506
Keyword.merge(cmd,
501507
lsid: %{id: id},
@@ -504,12 +510,13 @@ defmodule Mongo.Session do
504510
)
505511
|> Keyword.drop(~w(writeConcern readConcern)a)
506512

507-
{:keep_state_and_data, {:ok, conn, result}}
513+
client_opts = merge_timeout(client_opts, opts)
514+
{:keep_state_and_data, {:ok, conn, result, client_opts}}
508515
end
509516

510517
# In case of wire_version < 6 we do nothing
511-
def handle_call_event({:bind_session, cmd}, _transaction, %Session{conn: conn}) do
512-
{:keep_state_and_data, {:ok, conn, cmd}}
518+
def handle_call_event({:bind_session, cmd, client_opts}, _transaction, %Session{conn: conn}) do
519+
{:keep_state_and_data, {:ok, conn, cmd, client_opts}}
513520
end
514521

515522
def handle_call_event({:commit_transaction, _start_time}, :starting_transaction, _data) do
@@ -710,4 +717,14 @@ defmodule Mongo.Session do
710717
def in_session(session, _topology_pid, _read_write_type, fun, opts) do
711718
fun.(session, opts)
712719
end
720+
721+
defp merge_timeout(opts, default_ops) do
722+
case Keyword.get(default_ops, :timeout) do
723+
nil ->
724+
opts
725+
726+
timeout ->
727+
Keyword.put_new(opts, :timeout, timeout)
728+
end
729+
end
713730
end

lib/mongo/topology.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,8 @@ defmodule Mongo.Topology do
383383

384384
## found
385385
{:ok, {address, opts}} ->
386+
opts = merge_timeout(opts, state.opts)
387+
386388
with {:ok, connection} <- get_connection(address, state),
387389
wire_version <- wire_version(address, topology),
388390
{server_session, new_state} <- checkout_server_session(state),
@@ -593,4 +595,14 @@ defmodule Mongo.Topology do
593595
Keyword.put_new(opts, :read_preference, read_preference)
594596
end
595597
end
598+
599+
defp merge_timeout(opts, default_ops) do
600+
case Keyword.get(default_ops, :timeout) do
601+
nil ->
602+
opts
603+
604+
timeout ->
605+
Keyword.put_new(opts, :timeout, timeout)
606+
end
607+
end
596608
end

0 commit comments

Comments
 (0)