Skip to content

Commit 08d7fbc

Browse files
committed
fix: improved the reconnection workflow
1 parent 1218418 commit 08d7fbc

File tree

10 files changed

+124
-43
lines changed

10 files changed

+124
-43
lines changed

lib/mongo/mongo_db_connection.ex

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ defmodule Mongo.MongoDBConnection do
3737
wire_version: 0,
3838
auth_mechanism: opts[:auth_mechanism] || nil,
3939
connection_type: Keyword.fetch!(opts, :connection_type),
40+
server_pid: Keyword.get(opts, :server_pid),
4041
topology_pid: Keyword.fetch!(opts, :topology_pid),
4142
stable_api: Keyword.get(opts, :stable_api),
4243
use_op_msg: Keyword.get(opts, :stable_api) != nil,
@@ -48,8 +49,24 @@ defmodule Mongo.MongoDBConnection do
4849
end
4950

5051
@impl true
51-
def disconnect(_error, %{connection: {mod, socket}, connection_type: type, topology_pid: pid, host: host}) do
52-
GenServer.cast(pid, {:disconnect, type, host})
52+
## the stream monitor disconnects, we change the mode of the parent monitor
53+
def disconnect(_error, %{connection: {mod, socket}, connection_type: :stream_monitor, parent_pid: parent_pid}) do
54+
## Logger.debug("MongoDB-Connection: disconnected stream monitor: #{inspect(error)}")
55+
GenServer.cast(parent_pid, :stop_streaming_mode)
56+
mod.close(socket)
57+
:ok
58+
end
59+
60+
def disconnect(_error, %{connection: {mod, socket}, connection_type: :monitor, topology_pid: topology_pid, host: host, server_pid: server_pid}) do
61+
## Logger.debug("MongoDB-Connection: disconnected: #{inspect(error)}, #{inspect(server_pid)}, #{inspect(host)}, cast disconnect :monitor")
62+
GenServer.cast(server_pid, :stop_streaming_mode)
63+
GenServer.cast(topology_pid, {:disconnect, :monitor, host, server_pid})
64+
mod.close(socket)
65+
:ok
66+
end
67+
68+
def disconnect(_error, %{connection: {mod, socket}}) do
69+
## Logger.debug("MongoDB-Connection: disconnected: #{inspect error}, #{inspect type}, #{inspect host} #{inspect server_pid}")
5370
mod.close(socket)
5471
:ok
5572
end

lib/mongo/monitor.ex

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ defmodule Mongo.Monitor do
4747
GenServer.cast(pid, :update)
4848
end
4949

50+
def stop_streaming_mode(pid) do
51+
GenServer.cast(pid, :stop_streaming_mode)
52+
end
53+
5054
def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do
5155
GenServer.cast(pid, {:update, heartbeat_frequency_ms})
5256
end
@@ -59,7 +63,7 @@ defmodule Mongo.Monitor do
5963
Initialize the monitor process
6064
"""
6165
def init([address, topology_pid, heartbeat_frequency_ms, connection_opts]) do
62-
## debug info("Starting monitor process with pid #{inspect self()}, #{inspect address}")
66+
## Logger.info("Starting monitor process with pid #{inspect(self())}, #{inspect(address)}")
6367

6468
# monitors don't authenticate and use the "admin" database
6569
opts =
@@ -73,6 +77,7 @@ defmodule Mongo.Monitor do
7377
|> Keyword.put(:topology_pid, topology_pid)
7478
|> Keyword.put(:pool_size, 1)
7579
|> Keyword.put(:idle_interval, 5_000)
80+
|> Keyword.put(:server_pid, self())
7681

7782
with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
7883
{:ok,
@@ -97,18 +102,17 @@ defmodule Mongo.Monitor do
97102
end
98103

99104
@doc """
100-
In case of terminating we stop the our linked processes as well:
105+
In case of terminating we stop our linked processes as well:
101106
* connection
102107
* streaming process
103108
"""
104109
def terminate(reason, %{connection_pid: connection_pid, streaming_pid: nil}) do
105-
## debug info("Terminating monitor for reason #{inspect reason}")
110+
## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}")
106111
GenServer.stop(connection_pid, reason)
107112
end
108113

109114
def terminate(reason, %{connection_pid: connection_pid, streaming_pid: streaming_pid}) do
110-
## debug info("Terminating monitor for reason #{inspect reason}, #{inspect self()}, #{inspect streaming_pid}")
111-
115+
## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}, #{inspect(streaming_pid)}")
112116
GenServer.stop(connection_pid, reason)
113117
GenServer.stop(streaming_pid, reason)
114118
end
@@ -117,6 +121,7 @@ defmodule Mongo.Monitor do
117121
Report the connection event, so the topology process can now create the connection pool.
118122
"""
119123
def connected(_connection, me, topology_pid) do
124+
## Logger.info("Monitor #{inspect(me)} connected to server! ")
120125
Topology.monitor_connected(topology_pid, me)
121126
GenServer.cast(me, :update)
122127
end
@@ -125,6 +130,15 @@ defmodule Mongo.Monitor do
125130
{:reply, Map.put(state, :pid, self()), state}
126131
end
127132

133+
def handle_cast(:stop_streaming_mode, %{streaming_pid: streaming_pid} = state) when streaming_pid != nil do
134+
spawn(fn -> GenServer.stop(streaming_pid) end)
135+
{:noreply, %{state | mode: :polling_mode, streaming_pid: nil}}
136+
end
137+
138+
def handle_cast(:stop_streaming_mode, state) do
139+
{:noreply, %{state | mode: :polling_mode}}
140+
end
141+
128142
##
129143
# Update the server description or the rrt value
130144
##
@@ -207,11 +221,11 @@ defmodule Mongo.Monitor do
207221
# Starts the streaming mode
208222
##
209223
defp start_streaming_mode(%{address: address, topology_pid: topology_pid, opts: opts} = state, _server_description) do
210-
args = [topology_pid, address, opts]
224+
args = [self(), topology_pid, address, opts]
211225

212226
case StreamingHelloMonitor.start_link(args) do
213227
{:ok, pid} ->
214-
## debug info("Starting streaming mode: #{inspect self()}")
228+
## Logger.debug("Starting streaming mode: #{inspect(pid)}")
215229
%{state | mode: :streaming_mode, streaming_pid: pid, heartbeat_frequency_ms: 10_000}
216230

217231
error ->

lib/mongo/password_safe.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ defmodule Mongo.PasswordSafe do
1010

1111
use GenServer
1212

13-
def new() do
13+
def start_link() do
1414
GenServer.start_link(@me, [])
1515
end
1616

lib/mongo/repo.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ defmodule Mongo.Repo do
457457
@callback update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}
458458

459459
@doc """
460-
Same as `c:update/1` but raises an error.
460+
Same as `c:update/2` but raises an error.
461461
"""
462462
@callback update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()
463463

@@ -471,7 +471,7 @@ defmodule Mongo.Repo do
471471
@callback insert_or_update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}
472472

473473
@doc """
474-
Same as `c:insert_or_update/1` but raises an error.
474+
Same as `c:insert_or_update/2` but raises an error.
475475
"""
476476
@callback insert_or_update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()
477477

lib/mongo/streaming_hello_monitor.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ defmodule Mongo.StreamingHelloMonitor do
3030
@doc """
3131
Initialize the monitor process
3232
"""
33-
def init([topology_pid, address, opts]) do
33+
def init([monitor_pid, topology_pid, address, opts]) do
3434
heartbeat_frequency_ms = 10_000
3535

3636
opts =
3737
opts
3838
|> Keyword.drop([:after_connect])
3939
|> Keyword.put(:after_connect, {StreamingHelloMonitor, :connected, [self()]})
4040
|> Keyword.put(:connection_type, :stream_monitor)
41+
|> Keyword.put(:server_pid, self())
42+
|> Keyword.put(:monitor_pid, monitor_pid)
4143

4244
## debug info("Starting stream hello monitor with options #{inspect(opts, pretty: true)}")
4345

@@ -65,7 +67,7 @@ defmodule Mongo.StreamingHelloMonitor do
6567
In this case we stop the DBConnection.
6668
"""
6769
def terminate(reason, %{connection_pid: connection_pid}) do
68-
## debug info("Terminating streaming hello monitor for reason #{inspect reason}")
70+
## Logger.debug("Terminating streaming hello monitor for reason #{inspect(reason)}")
6971
GenServer.stop(connection_pid, reason)
7072
end
7173

@@ -84,7 +86,7 @@ defmodule Mongo.StreamingHelloMonitor do
8486
end
8587

8688
def handle_info({:EXIT, _pid, reason}, state) do
87-
## debug Logger.warn("Stopped with reason #{inspect reason}")
89+
Logger.warning("Stopped with reason #{inspect(reason)}")
8890
{:stop, reason, state}
8991
end
9092

lib/mongo/topology.ex

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ defmodule Mongo.Topology do
7979
GenServer.call(pid, :get_state)
8080
end
8181

82-
# 97
8382
def select_server(pid, type, opts \\ []) do
8483
timeout = Keyword.get(opts, :checkout_timeout, @default_checkout_timeout)
8584
GenServer.call(pid, {:select_server, type, opts}, timeout)
@@ -109,7 +108,7 @@ defmodule Mongo.Topology do
109108
end
110109

111110
def stop(pid) do
112-
GenServer.stop(pid)
111+
GenServer.stop(pid, :stop)
113112
end
114113

115114
## GenServer Callbacks
@@ -163,14 +162,7 @@ defmodule Mongo.Topology do
163162
end
164163
end
165164

166-
def terminate(_reason, state) do
167-
case state.opts[:pw_safe] do
168-
nil -> nil
169-
pid -> GenServer.stop(pid)
170-
end
171-
172-
Enum.each(state.connection_pools, fn {_address, pid} -> GenServer.stop(pid) end)
173-
Enum.each(state.monitors, fn {_address, pid} -> GenServer.stop(pid) end)
165+
def terminate(_reason, _state) do
174166
Mongo.Events.notify(%TopologyClosedEvent{topology_pid: self()})
175167
end
176168

@@ -204,18 +196,25 @@ defmodule Mongo.Topology do
204196
##
205197
# In case of :monitor or :stream_monitor we mark the server description of the address as unknown
206198
##
207-
def handle_cast({:disconnect, kind, address}, state) when kind in [:monitor, :stream_monitor] do
208-
server_description = ServerDescription.parse_hello_response(address, "#{inspect(kind)} disconnected")
199+
def handle_cast({:disconnect, :monitor, address, pid}, state) do
200+
server_description = ServerDescription.parse_hello_response(address, "monitor disconnected")
201+
## Logger.debug("Disconnect monitor with #{inspect(pid)}")
209202

210203
new_state =
211204
address
212-
|> remove_address(state)
205+
|> close_connection_pool(pid, state)
213206
|> maybe_reinit()
214207

215208
handle_cast({:server_description, server_description}, new_state)
216209
end
217210

218-
def handle_cast({:disconnect, _kind, _host}, state) do
211+
def handle_cast({:disconnect, :stream_monitor, _host, _pid}, state) do
212+
## IO.inspect("ignored: kind stream_monitor with #{inspect pid}")
213+
{:noreply, state}
214+
end
215+
216+
def handle_cast({:disconnect, _kind, _host, _pid}, state) do
217+
## IO.inspect("ignored: kind #{inspect kind}")
219218
{:noreply, state}
220219
end
221220

@@ -233,6 +232,7 @@ defmodule Mongo.Topology do
233232

234233
{host, ^monitor_pid} ->
235234
arbiters = fetch_arbiters(state)
235+
Mongo.Events.notify(%ServerOpeningEvent{address: host, topology_pid: self()})
236236

237237
if host in arbiters do
238238
state
@@ -243,8 +243,9 @@ defmodule Mongo.Topology do
243243
|> Keyword.put(:topology_pid, self())
244244
|> connect_opts_from_address(host)
245245

246+
## Logger.debug("Starting connection pool for #{inspect(host)}")
246247
{:ok, pool} = DBConnection.start_link(Mongo.MongoDBConnection, conn_opts)
247-
connection_pools = Map.put(state.connection_pools, host, pool)
248+
connection_pools = replace_pool(state.connection_pools, host, pool)
248249

249250
Process.send_after(self(), {:new_connection, state.waiting_pids}, 10)
250251

@@ -279,6 +280,49 @@ defmodule Mongo.Topology do
279280
{:noreply, state}
280281
end
281282

283+
## remove the address only if the pid is the same
284+
defp close_connection_pool(address, pid, state) do
285+
## Logger.debug("Closing connection pool by pid: #{inspect(state.monitors[address] == pid)}, #{inspect(pid)}, #{inspect(state.monitors[address])}")
286+
287+
case state.monitors[address] == pid do
288+
true ->
289+
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
290+
## stopping the connection pool
291+
case state.connection_pools[address] do
292+
nil ->
293+
:ok
294+
295+
pid ->
296+
if Process.alive?(pid) do
297+
## Logger.debug("Stopping the connection pool #{inspect(pid)} für #{inspect(address)}")
298+
GenServer.stop(pid)
299+
end
300+
end
301+
302+
%{state | connection_pools: Map.delete(state.connection_pools, address)}
303+
304+
false ->
305+
state
306+
end
307+
end
308+
309+
## replaces a pool for the host address
310+
defp replace_pool(connection_pools, host, pool) do
311+
## if we found an existing pool, we will stop it first
312+
case Map.get(connection_pools, host) do
313+
nil ->
314+
:noop
315+
316+
pid ->
317+
if Process.alive?(pid) do
318+
## Logger.debug("Stopping the connection pool #{inspect(pid)}")
319+
GenServer.stop(pid)
320+
end
321+
end
322+
323+
Map.put(connection_pools, host, pool)
324+
end
325+
282326
##
283327
# Update server description: in case of logical session the function creates a session pool for the `deployment`.
284328
#
@@ -510,9 +554,6 @@ defmodule Mongo.Topology do
510554
Enum.reduce(added, state, fn address, state ->
511555
server_description = state.topology.servers[address]
512556
connopts = connect_opts_from_address(state.opts, address)
513-
514-
Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()})
515-
516557
args = [server_description.address, self(), heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)]
517558
{:ok, pid} = Monitor.start_link(args)
518559

@@ -549,16 +590,23 @@ defmodule Mongo.Topology do
549590
end
550591

551592
defp remove_address(address, state) do
552-
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
553-
554593
case state.monitors[address] do
555-
nil -> :ok
556-
pid -> GenServer.stop(pid)
594+
nil ->
595+
:ok
596+
597+
pid ->
598+
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
599+
## Logger.debug("Stopping: #{inspect(pid)} for #{inspect(address)}")
600+
GenServer.stop(pid)
557601
end
558602

559603
case state.connection_pools[address] do
560-
nil -> :ok
561-
pid -> GenServer.stop(pid)
604+
nil ->
605+
:ok
606+
607+
pid ->
608+
## Logger.debug("Connection pool: #{inspect(address)}")
609+
GenServer.stop(pid)
562610
end
563611

564612
%{state | monitors: Map.delete(state.monitors, address), connection_pools: Map.delete(state.connection_pools, address)}

lib/mongo/url_parser.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ defmodule Mongo.UrlParser do
168168

169169
value ->
170170
## start GenServer and put id
171-
with {:ok, pid} <- Mongo.PasswordSafe.new(),
171+
with {:ok, pid} <- Mongo.PasswordSafe.start_link(),
172172
:ok <- Mongo.PasswordSafe.set_password(pid, value) do
173173
opts
174174
|> Keyword.put(:password, "*****")

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ defmodule Mongodb.Mixfile do
3737
{:patch, "~> 0.12.0", only: [:dev, :test]},
3838
{:jason, "~> 1.3", only: [:dev, :test]},
3939
{:credo, "~> 1.7.0", only: [:dev, :test], runtime: false},
40-
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
40+
{:ex_doc, "== 0.24.1", only: :dev, runtime: false}
4141
]
4242
end
4343

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
55
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
66
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
7-
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
7+
"ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"},
88
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
99
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
1010
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},

test/mongo/password_safe_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Mongo.PasswordSafeTest do
77

88
test "encrypted password" do
99
pw = "my-secret-password"
10-
{:ok, pid} = PasswordSafe.new()
10+
{:ok, pid} = PasswordSafe.start_link()
1111
PasswordSafe.set_password(pid, pw)
1212
%{key: _key, pw: enc_pw} = :sys.get_state(pid)
1313
assert enc_pw != pw

0 commit comments

Comments
 (0)