`Stream.chunk_every/4` equivalent for Flow

Context:
While practicing optimizing some exercises, I got stuck this morning trying to use Flow on a piece of code using Stream.

Here is the code :

counter = fn x, acc -> Map.update(acc, x, 1, &(&1 + 1)) end # Create a stream for large files processing "your_filename.ext" |> File.stream!() # Normalize every words |> Stream.map(&String.downcase/1) |> Stream.map(&String.replace(&1, ~r"[^a-z0-9]", " ")) # Split with spaces and remove empty words |> Stream.flat_map(&String.split(&1, " ", trim: true)) # Get every sequences of 3 words in the text |> Stream.chunk_every(3, 1, :discard) # Join the sequences |> Stream.map(&Enum.join(&1, " ")) # Count occurences for 3 words sequences |> Enum.reduce(%{}, counter) # Just displays the 10 most used sequences |> Map.to_list() |> Enum.sort_by(&elem(&1, 1), :desc) |> Enum.take(10) |> IO.inspect() # Example of output with http://www.gutenberg.org/cache/epub/2009/pg2009.txt : # [ # {"of the same", 320}, # {"the same species", 130}, # {"conditions of life", 125}, # {"in the same", 117}, # {"of natural selection", 111}, # {"from each other", 104}, # {"species of the", 102}, # {"on the other", 89}, # {"the other hand", 81}, # {"the case of", 78} # ] 

In short, this piece of code retrieves all the 3-word sequences of a text and displays the 10 that appear the most.

Problem:
I can’t find the equivalent of the line Stream.chunk_every(3, 1, :discard) for Flow.

Here is my current code:

Mix.install([{:flow, "~> 1.2"}]) counter = fn x, acc -> Map.update(acc, x, 1, &(&1 + 1)) end "your_filename.ext" |> File.stream!(read_ahead: 100_000) |> Flow.from_enumerable() |> Flow.map(&String.downcase/1) |> Flow.map(&String.replace(&1, ~r"[^a-z0-9]", " ")) |> Flow.flat_map(&String.split(&1, " ", trim: true)) # |> Stream.chunk_every(3, 1, :discard) <= ???????? |> Flow.map(&Enum.join(&1, " ")) |> Flow.partition() |> Flow.reduce(&Map.new/0, counter) |> Enum.to_list() |> Enum.sort_by(&elem(&1, 1), :desc) |> Enum.take(10) |> IO.inspect() 
1 Like