This is my first time using Flow, and I want to make sure I’m understanding the paradigm correctly…
My understanding is that the following code will
- Divvy out the work of mapping over the CSV lines to be done in parallel by Flow (GenStage under the hood)
- The results will be collected back into a single string (of indeterminate order).
- The results will be chunked from that single stream into chunks of 1,000
- Each upsert of [up to] 1,000 record will then run concurrently in its own process (Task)
def import(path_to_csv) do path_to_csv |> File.stream!() |> CSV.parse_stream() |> Flow.from_enumerable() |> Flow.map(&do_some_work_and_set_up_desired_attrs/1) |> Stream.uniq_by(fn %{hash: hash} -> hash end) |> Stream.chunk_every(1_000) |> concurrent_upsert() |> Stream.run() end def concurrent_upsert(stream) do Task.Supervisor.async_stream_nolink(BatchImporter, stream, &upsert_batch(&1)) end def upsert_batch(attrs_batch) do # do upsert end
The reason I chose Flow for the mapping was because it seemed like a natural fit for handling 10s of thousands of rows from a CSV.
The reason I did not use Flow for the upserts is that, for example, when inserting a total of 15k rows, that becomes 15 batches. And Flow doesn’t seem designed to handle small numbers of items (15 in this example). I wanted to force each upsert into its own process, which seemed to be a better fit for something like Task.Supervisor.async_stream_nolink
.
Can someone confirm if this reasoning is sound?