ca1989         July 17, 2023, 11:29am     1       Today I started studying Flow, and I have some questions.  Please keep in mind that this is an example code, I want  to use Flow.
 So, it’s:
  Reading a text file  Remove empty lines and newlines  Count the words  Generate a comma separated string line with word and count    This is what I’ve done so far:
 def count_words(filepath) do File.stream!(filepath) |> Flow.from_enumerable() |> Flow.flat_map(&String.split(&1, " ")) |> Flow.reject(&(&1 == "\n")) |> Flow.map(&String.replace(&1, "\n", "")) |> Flow.partition() |> Flow.reduce(fn -> %{} end, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) |> Enum.map(fn {word, count} -> "#{word},#{count}\n" end) end   My questions:
  Is it ok to chain multiple Flow operations? (reject, map…) or should I put everything in the reducer function?  IIUC The last Enum.map is not executed in parallel, but if I try to use Flow.map I got this error:  ** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)  which is quite informative, but still don’t get it    Cheers!
        1 Like  
      NobbZ       July 17, 2023, 3:34pm     2        Yes.
 Putting everything in the reducer would create a flow that slightly shuffles your elements and then effectively reduces serially.
   
  ca1989:
   IIUC The last Enum.map is not executed in parallel, but if I try to use Flow.map I got this error:  ** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)  which is quite informative, but still don’t get it
     Because there are no events anymore after the reduce, you just have a single value that was reduced into.
          
      ca1989       July 18, 2023, 5:40am     3        Ok, thanks.
  I am quite embarassed by that. For some reasons, I was thinking in Enumerables when Flow functions works on Flows (Flow — Flow v1.2.4 ).
 (Sorry for the very silly questions, I’m studying Flow alone and I have no one to ask silly questions to…)
 Cheers!
        1 Like  
      NobbZ       July 18, 2023, 6:08am     4       As with the Enum functions, the Flow functions operate on flows, yes, but they don’t necessarily return a flow.
          
      ca1989       July 18, 2023, 7:16am     5       Now my question is:
  What is the correct way to transform the result of that reduce in a parallel way?    Should I create another Flow from the result of the reduce?
 Cheers
          
       Flow.reduce returns a flow, so you should be able to chain further Flow functions.
          
      NobbZ       July 18, 2023, 8:19am     7       Hmmm… Seems as if I so far totally misunderstood the use of Flow.reduce/3 then…
          
      ca1989       July 18, 2023, 8:33am     8       Flow.reduce does indeed return a Flow, but I can’t call Flow.map on it, as I get the error I posted in the original post.
 Following the suggestion in the error message, I was able to use on_trigger on the flow, to map over the result of Flow.reduce:
 def count_words(filepath, :flow) do File.stream!(filepath) |> Flow.from_enumerable() |> Flow.flat_map(&String.split(&1, " ")) |> Flow.reject(&(&1 == "\n")) |> Flow.map(&String.replace(&1, "\n", "")) |> Flow.map(&String.replace(&1, ",", " ")) |> Flow.partition() |> Flow.reduce(fn -> %{} end, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) |> Flow.on_trigger(fn wordcount -> {Enum.map(wordcount, fn {w, c} -> "#{w},#{c}\n" end), wordcount} end) |> Enum.into([]) end   IIUC the Enum.map inside the on_trigger run sequentially on the batches, while the trigger function itself is parallel.
 Thank you
        2 Likes  
      I’m here because I am working through the Concurrent Data Processing in Elixir book. There’s a point in the bit on Flows where there is an error in the code. On page 102, this is suggested:
 |> Flow.reject(&(&1.type == "closed")) |> Flow.partition(key: {:key, :country}) |> Flow.group_by(& &1.country) |> Flow.map(fn {country, data} -> {country, Enum.count(data)} end) |> Enum.to_list()   The result of this code is a similar error:
 iex(4)> Airports.open_airports ** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state) (flow 1.2.4) lib/flow.ex:1976: Flow.add_mapper/3 (airports 0.1.0) lib/airports.ex:26: Airports.open_airports/0 iex:4: (file)   To anyone else who may have arrived by the same path, making this change will allow you to continue unabated:
  |> Flow.reject(&(&1.type == "closed")) |> Flow.partition(key: {:key, :country}) |> Flow.group_by(& &1.country) |> Flow.on_trigger(fn count -> {Enum.map(count, fn {country, data} -> {country, Enum.count(data)} end), count} end) |> Enum.to_list()          6 Likes