Skip to content

Commit dc79914

Browse files
committed
Add :limit option to Task.yield_many/2
1 parent b86b8ba commit dc79914

File tree

2 files changed

+50
-9
lines changed

2 files changed

+50
-9
lines changed

lib/elixir/lib/task.ex

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,8 @@ defmodule Task do
11091109
* `{:ok, term}` if the task has successfully reported its
11101110
result back in the given time interval
11111111
* `{:exit, reason}` if the task has died
1112-
* `nil` if the task keeps running past the timeout
1112+
* `nil` if the task keeps running, either because a limit
1113+
has been reached or past the timeout
11131114
11141115
Check `yield/2` for more information.
11151116
@@ -1162,6 +1163,10 @@ defmodule Task do
11621163
The second argument is either a timeout or options, which defaults
11631164
to this:
11641165
1166+
* `:limit` - the maximum amount of tasks to wait for.
1167+
If the limit is reached before the timeout, this function
1168+
returns immediately without triggering the `:on_timeout` behaviour
1169+
11651170
* `:timeout` - the maximum amount of time (in milliseconds or `:infinity`)
11661171
each task is allowed to execute for. Defaults to `5000`.
11671172
@@ -1173,7 +1178,11 @@ defmodule Task do
11731178
* `:kill_task` - the task that timed out is killed.
11741179
"""
11751180
@spec yield_many([t], timeout) :: [{t, {:ok, term} | {:exit, term} | nil}]
1176-
@spec yield_many([t], timeout: timeout, on_timeout: :nothing | :ignore | :kill_task) ::
1181+
@spec yield_many([t],
1182+
limit: pos_integer(),
1183+
timeout: timeout,
1184+
on_timeout: :nothing | :ignore | :kill_task
1185+
) ::
11771186
[{t, {:ok, term} | {:exit, term} | nil}]
11781187
def yield_many(tasks, opts \\ [])
11791188

@@ -1182,9 +1191,6 @@ defmodule Task do
11821191
end
11831192

11841193
def yield_many(tasks, opts) when is_list(opts) do
1185-
on_timeout = Keyword.get(opts, :on_timeout, :nothing)
1186-
timeout = Keyword.get(opts, :timeout, 5_000)
1187-
11881194
refs =
11891195
Map.new(tasks, fn %Task{ref: ref, owner: owner} = task ->
11901196
if owner != self() do
@@ -1194,6 +1200,9 @@ defmodule Task do
11941200
{ref, nil}
11951201
end)
11961202

1203+
on_timeout = Keyword.get(opts, :on_timeout, :nothing)
1204+
timeout = Keyword.get(opts, :timeout, 5_000)
1205+
limit = Keyword.get(opts, :limit, map_size(refs))
11971206
timeout_ref = make_ref()
11981207

11991208
timer_ref =
@@ -1202,16 +1211,17 @@ defmodule Task do
12021211
end
12031212

12041213
try do
1205-
yield_many(map_size(refs), refs, timeout_ref, timer_ref)
1214+
yield_many(limit, refs, timeout_ref, timer_ref)
12061215
catch
12071216
{:noconnection, reason} ->
12081217
exit({reason, {__MODULE__, :yield_many, [tasks, timeout]}})
12091218
else
1210-
refs ->
1219+
{timed_out?, refs} ->
12111220
for task <- tasks do
12121221
value =
12131222
with nil <- Map.fetch!(refs, task.ref) do
12141223
case on_timeout do
1224+
_ when not timed_out? -> nil
12151225
:nothing -> nil
12161226
:kill_task -> shutdown(task, :brutal_kill)
12171227
:ignore -> ignore(task)
@@ -1226,7 +1236,7 @@ defmodule Task do
12261236
defp yield_many(0, refs, timeout_ref, timer_ref) do
12271237
timer_ref && Process.cancel_timer(timer_ref)
12281238
receive do: (^timeout_ref -> :ok), after: (0 -> :ok)
1229-
refs
1239+
{false, refs}
12301240
end
12311241

12321242
defp yield_many(limit, refs, timeout_ref, timer_ref) do
@@ -1243,7 +1253,7 @@ defmodule Task do
12431253
end
12441254

12451255
^timeout_ref ->
1246-
refs
1256+
{true, refs}
12471257
end
12481258
end
12491259

lib/elixir/test/elixir/task_test.exs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,37 @@ defmodule TaskTest do
601601
[{task1, {:ok, :result}}, {task2, nil}, {task3, {:exit, :normal}}]
602602
end
603603

604+
test "returns results from multiple tasks with limit" do
605+
task1 = %Task{ref: make_ref(), owner: self(), pid: nil, mfa: {__MODULE__, :test, 1}}
606+
task2 = %Task{ref: make_ref(), owner: self(), pid: nil, mfa: {__MODULE__, :test, 1}}
607+
task3 = %Task{ref: make_ref(), owner: self(), pid: nil, mfa: {__MODULE__, :test, 1}}
608+
609+
send(self(), {task1.ref, :result})
610+
send(self(), {:DOWN, task3.ref, :process, self(), :normal})
611+
612+
assert Task.yield_many([task1, task2, task3], limit: 1, timeout: :infinity) ==
613+
[{task1, {:ok, :result}}, {task2, nil}, {task3, nil}]
614+
615+
assert Task.yield_many([task2, task3], limit: 1, timeout: :infinity) ==
616+
[{task2, nil}, {task3, {:exit, :normal}}]
617+
end
618+
619+
test "returns results from multiple tasks with limit and on timeout" do
620+
Process.flag(:trap_exit, true)
621+
task1 = Task.async(fn -> Process.sleep(:infinity) end)
622+
task2 = Task.async(fn -> :done end)
623+
624+
assert Task.yield_many([task1, task2], timeout: :infinity, on_timeout: :kill_task, limit: 1) ==
625+
[{task1, nil}, {task2, {:ok, :done}}]
626+
627+
assert Process.alive?(task1.pid)
628+
629+
assert Task.yield_many([task1], timeout: 0, on_timeout: :kill_task, limit: 1) ==
630+
[{task1, nil}]
631+
632+
refute Process.alive?(task1.pid)
633+
end
634+
604635
test "returns results on infinity timeout" do
605636
task1 = %Task{ref: make_ref(), owner: self(), pid: nil, mfa: {__MODULE__, :test, 1}}
606637
task2 = %Task{ref: make_ref(), owner: self(), pid: nil, mfa: {__MODULE__, :test, 1}}

0 commit comments

Comments
 (0)