Skip to content

Commit 9e4d569

Browse files
authored
fix: added support for read preference specified by the URL (#207)
1 parent be592e3 commit 9e4d569

File tree

8 files changed

+248
-117
lines changed

8 files changed

+248
-117
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,15 +798,15 @@ a simple map, supporting the following keys:
798798

799799
* `:mode`, possible values: `:primary`, `:primary_preferred`, `:secondary`, `:secondary_preferred` and `:nearest`
800800
* `:max_staleness_ms`, the maxStaleness value in milliseconds
801-
* `:tag_sets`, the set of tags, for example: `[dc: "west", usage: "production"]`
801+
* `:tags`, the set of tags, for example: `[dc: "west", usage: "production"]`
802802

803803
The driver selects the server using the read preference.
804804

805805
```elixr
806806
prefs = %{
807807
mode: :secondary,
808808
max_staleness_ms: 120_000,
809-
tag_sets: [dc: "west", usage: "production"]
809+
tags: [dc: "west", usage: "production"]
810810
}
811811
812812
Mongo.find_one(top, "dogs", %{name: "Oskar"}, read_preference: prefs)

lib/mongo/read_preference.ex

Lines changed: 77 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,134 +4,141 @@ defmodule Mongo.ReadPreference do
44
@moduledoc ~S"""
55
Determines which servers are considered suitable for read operations
66
7-
A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
7+
A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`.
88
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
9-
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
9+
If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
1010
If hedge is set, it configures how server hedged reads are used.
1111
1212
The default mode is `:primary`.
13-
The default tag_sets is a list with an empty tag set: [{}].
13+
The default tags is a list with an empty tag set: [{}].
1414
The default max_staleness_ms is unset.
1515
The default hedge is unset.
1616
1717
## mode
1818
1919
* `:primary` Only an available primary is suitable.
20-
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
20+
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable.
2121
* `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
2222
but only eligible secondaries are suitable.
2323
* `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
2424
Otherwise, when there are no eligible secondaries, the primary is suitable.
2525
* `:nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable.
2626
2727
"""
28-
@type t :: %{
29-
mode:
30-
:primary
31-
| :secondary
32-
| :primary_preferred
33-
| :secondary_preferred
34-
| :nearest,
35-
tag_sets: [%{String.t() => String.t()}],
36-
max_staleness_ms: non_neg_integer,
37-
hedge: BSON.document()
38-
}
3928

4029
@primary %{
4130
mode: :primary,
42-
tag_sets: [],
31+
tags: [],
4332
max_staleness_ms: 0
4433
}
4534

46-
def primary(map \\ nil)
35+
@doc """
36+
Merge default values to the read preferences and converts deprecated tag_sets to tags
37+
"""
38+
def merge_defaults(%{tag_sets: tags} = map) do
39+
map =
40+
map
41+
|> Map.delete(:tag_sets)
42+
|> Map.put(:tags, tags)
43+
44+
Map.merge(@primary, map)
45+
end
4746

48-
def primary(map) when is_map(map) do
47+
def merge_defaults(map) when is_map(map) do
4948
Map.merge(@primary, map)
5049
end
5150

52-
def primary(_), do: @primary
51+
def merge_defaults(_other) do
52+
@primary
53+
end
5354

5455
@doc """
5556
Add read preference to the cmd
5657
"""
5758
def add_read_preference(cmd, opts) do
5859
case Keyword.get(opts, :read_preference) do
59-
nil -> cmd
60-
pref -> cmd ++ ["$readPreference": pref]
60+
nil ->
61+
cmd
62+
63+
pref ->
64+
cmd ++ ["$readPreference": pref]
6165
end
6266
end
6367

6468
@doc """
65-
From the specs:
66-
67-
Use of slaveOk
68-
69-
There are two usages of slaveOK:
70-
71-
* A driver query parameter that predated read preference modes and tag set lists.
72-
* A wire protocol flag on OP_QUERY operations
73-
69+
Converts the preference to the mongodb format for replica sets
7470
"""
75-
def slave_ok(%{:mode => :primary}) do
76-
%{:mode => :primary}
71+
def to_replica_set(%{:mode => :primary}) do
72+
%{mode: :primary}
7773
end
7874

79-
def slave_ok(config) do
75+
def to_replica_set(config) do
8076
mode =
8177
case config[:mode] do
82-
:primary_preferred -> :primaryPreferred
83-
:secondary_preferred -> :secondaryPreferred
84-
other -> other
85-
end
78+
:primary_preferred ->
79+
:primaryPreferred
8680

87-
filter_nils(mode: mode, tag_sets: config[:tag_sets])
88-
end
81+
:secondary_preferred ->
82+
:secondaryPreferred
8983

90-
##
91-
# Therefore, when sending queries to a mongos, the following rules apply:
92-
#
93-
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
94-
def mongos(%{mode: :primary}) do
95-
nil
96-
end
84+
other ->
85+
other
86+
end
9787

98-
# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
99-
def mongos(%{mode: :secondary} = config) do
100-
transform(config)
101-
end
88+
case config[:tags] do
89+
[] ->
90+
%{mode: mode}
10291

103-
# For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
104-
def mongos(%{mode: :primary_preferred} = config) do
105-
transform(config)
106-
end
92+
nil ->
93+
%{mode: mode}
10794

108-
# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
109-
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
110-
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
111-
def mongos(%{mode: :secondary_preferred} = config) do
112-
transform(config)
95+
tags ->
96+
%{mode: mode, tags: [tags]}
97+
end
11398
end
11499

115-
# For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
116-
def mongos(%{mode: :nearest} = config) do
117-
transform(config)
100+
@doc """
101+
Converts the preference to the mongodb format for mongos
102+
"""
103+
def to_mongos(%{mode: :primary}) do
104+
nil
118105
end
119106

120-
defp transform(config) do
107+
# for the others we should use the read preferences
108+
def to_mongos(config) do
121109
mode =
122110
case config[:mode] do
123-
:primary_preferred -> :primaryPreferred
124-
:secondary_preferred -> :secondaryPreferred
125-
other -> other
111+
:primary_preferred ->
112+
:primaryPreferred
113+
114+
:secondary_preferred ->
115+
:secondaryPreferred
116+
117+
other ->
118+
other
126119
end
127120

128121
max_staleness_seconds =
129122
case config[:max_staleness_ms] do
130-
i when is_integer(i) -> div(i, 1000)
131-
nil -> nil
123+
i when is_integer(i) ->
124+
div(i, 1000)
125+
126+
nil ->
127+
nil
128+
end
129+
130+
read_preference =
131+
case config[:tags] do
132+
[] ->
133+
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
134+
135+
nil ->
136+
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
137+
138+
tags ->
139+
%{mode: mode, tags: [tags], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
132140
end
133141

134-
[mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]]
135-
|> filter_nils()
142+
filter_nils(read_preference)
136143
end
137144
end

lib/mongo/topology.ex

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ defmodule Mongo.Topology do
373373
# checkout a new session
374374
#
375375
def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
376+
opts = merge_read_preferences(opts, state.opts)
377+
376378
case TopologyDescription.select_servers(topology, read_write_type, opts) do
377379
:empty ->
378380
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts})
@@ -398,6 +400,8 @@ defmodule Mongo.Topology do
398400
end
399401

400402
def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
403+
opts = merge_read_preferences(opts, state.opts)
404+
401405
case TopologyDescription.select_servers(topology, read_write_type, opts) do
402406
:empty ->
403407
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts})
@@ -579,4 +583,14 @@ defmodule Mongo.Topology do
579583
defp fetch_arbiters(state) do
580584
Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end)
581585
end
586+
587+
defp merge_read_preferences(opts, url_opts) do
588+
case Keyword.get(url_opts, :read_preference) do
589+
nil ->
590+
opts
591+
592+
read_preference ->
593+
Keyword.put_new(opts, :read_preference, read_preference)
594+
end
595+
end
582596
end

0 commit comments

Comments
 (0)