Coroutines provides combine, zip and flattenMerge operators is used to combine emissions from multiple flows
Combine
Combine operator takes the latest emission from two flows and gives result
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { // gives 1..3 in every 300ms val f1 = flowOf(1,2,3).onEach { delay(300) } val f2 = flowOf("x", "y", "z").onEach { delay(400) } val startTime = System.currentTimeMillis() f1.combine(f2) { num, str -> "$num -> $str" } .collect { result -> println("$result at ${System.currentTimeMillis() - startTime} ms from start") } } /** 1 -> x at 424 ms from start 2 -> x at 627 ms from start 2 -> y at 826 ms from start 3 -> y at 927 ms from start 3 -> z at 1226 ms from start */
Zip -
Let's take an example as above. Each time emission occurs, zip operators waits for emission from other flow , when it occurs zip provide results in lambda expression as numbers and letters
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { val f1 = flowOf(1,2,3).onEach { delay(300) } val f2 = flowOf("x", "y", "z").onEach { delay(400) } val startTime = System.currentTimeMillis() f1.zip(f2) { num, str -> "$num -> $str" } .collect { result -> println("$result at ${System.currentTimeMillis() - startTime} ms from start") } } /* 1 -> x at 437 ms from start 2 -> y at 837 ms from start 3 -> z at 1239 ms from start */
it will stop execution when one of the flow is completed
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit>{ val f1 = (1..4).asFlow() val f2 = flowOf("Hi", "Hello", ) f1.zip(f2){ a,b -> "$a -> $b"} .collect{ println(it) } } /* 1 -> Hi 2 -> Hello */
flattenMerge
It executes them as single flow ,it doesn't combines , it will not stop execution when one of the flow is completed (But zip operator does)
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { val f1 = flowOf(1,2,3,4).onEach{ delay(200)} val f2 = flowOf("H","O","L","A").onEach{ delay(400)} val startTime = System.currentTimeMillis() flowOf(f1,f2).flattenMerge().collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") } } /** * 1 at 238 ms from start H at 438 ms from start 2 at 438 ms from start 3 at 639 ms from start O at 838 ms from start 4 at 839 ms from start L at 1239 ms from start A at 1640 ms from start */
Top comments (0)