Structured concurrency With Kotlin Coroutines
Vadims Savjolovs Product Engineer @ TransferWise @savjolovs@savjolovs @savjolovs
// Agenda Theory ● Coroutines ● Suspending functions ● Asynchronous Flows ● Channels Practice Business lookup feature in TransferWise app for Android
// What are Kotlin Coroutines? Lightweight threads? Programming paradigm? New reactive framework? ✅ ✅ ✅
fun main() { GlobalScope.launch { delay(1000L) println("World!") } println("Hello,") Thread.sleep(2000L) } // Coroutine are like threads In a context of a CoroutineScope launch is a coroutine builder that launches a new coroutine Print “Hello, ” Keep JVM alive for 2 more seconds suspending function that does not block a thread, but suspends coroutine
val counter = AtomicLong() for (i in 1..1_000_000L) { GlobalScope.launch { counter.incrementAndGet() } } println(counter) val counter = AtomicLong() for (i in 1..1_000_000L) { thread(start = true) { counter.incrementAndGet() } } println(counter) // Coroutines are cheap ~1.5 seconds~63 seconds
// Suspending functions suspend fun getFromNetwork(): String GlobalScope.launch { val data = getSomeData() updateUi(data) } suspend fun getSomeData(): String { val someString = getFromNetwork() val id = getIdFromNetwork() val res = process(someString, id) return res } // Dart language Future<String> getSomeData() async { final someString = await getFromNetwork(); final id = await getIdFromNetwork(); final res = process(someString, id); return Future.value(res); } // Dart language getSomeData().then((data) { updateUi(data); });
// Main-safety viewModelScope.launch(Dispatchers.Main) { val someString = getFromNetwork() liveData.value = someString } suspend fun getFromNetwork(): String { return URL("https://google.com").readText() } E/AndroidRuntime: FATAL EXCEPTION: main Process: com.transferwise.android android.os.NetworkOnMainThreadException Redundant ‘suspend’ modifier suspend fun getFromNetwork(): String { return withContext(Dispatchers.IO) { return@withContext URL("https://google.com").readText() } }
GlobalScope { getFromNetwork() } { getIdFromNetwork() } myScope getSomeData() // Parallel decomposition suspend fun getSomeData(): String { val someString = GlobalScope.async { getFromNetwork() } val id = GlobalScope.async { getIdFromNetwork() } val res = process(someString.await(), id.await()) return res } warning job.cancel() val job = myScope.launch { val someData = getSomeData() } LEAK
GlobalScope // Structured concurrency suspend fun getSomeData(): String { return coroutineScope { val someString = async { getFromNetwork() } val id = async { getIdFromNetwork() } val res = process(someString.await(), id.await()) return@coroutineScope res } } job.cancel() val job = myScope.launch { val someData = getSomeData() } myScope getSomeData() { getFromNetwork() } { getIdFromNetwork() }
// Asynchronous Flow fun numberFlow(): Flow<Int> = flow { for (i in 1..100) { delay(1000) emit(i) } } myScope.launch { numberFlow() .filter { it % 2 == 0 } .map { it * 2 } .take(10) .collect { number -> println(number) } }
// Channels myScope.launch { channel.send("1") delay(1000L) channel.send("2") } myScope.launch { channel.consumeEach { println(it) } } coroutine 1 coroutine 2 send receive channel
// What did we covered? Coroutines ● Very lightweight threads ● Structured concurrency ● Start with launch{} or async{} ● Call suspending functions Suspending functions ● Not blocking thread ● Suspends a coroutine ● For a long-running tasks Asynchronous Flows ● Data streams ● Transformation operators Channels ● Communication tool for coroutines ● Hot data streams
// Business lookup
// Lookup Service UI (Fragment) ViewModel Interactor Repository Network Service @GET("v2/registry/search") suspend fun businessLookup( @Query("searchTerms") query: String, @Query("countryCode") countryCode: String ): NetworkResponse<List<LookupBusinessResponse>, ErrorsResponse>
// Lookup Repository UI (Fragment) ViewModel Interactor Repository Network Service @Inject val lookupService: LookupService suspend fun businessLookup(query: String, countryCode: String) = withContext(Dispatchers.IO) { val response = lookupService.businessLookup(query, countryCode) when (response) { is NetworkResponse.Success -> return@withContext mapper.map(response) is NetworkResponse.Error -> return@withContext errorMapper.map(response) } }
// Lookup Interactor UI (Fragment) ViewModel Interactor Repository Network Service @Inject val lookupRepository: LookupRepository suspend operator fun invoke( query: String, countryCode: String ): Result<List<LookupBusiness>, ErrorMessage> { return lookupRepository.businessLookup(query, countryCode) } lookupInteractor() instead of lookupInteractor.invoke()
// Lookup ViewModel UI (Fragment) ViewModel Interactor Repository Network Service @Inject val lookupInteractor: LookupInteractor fun searchBusiness(query: String) { viewState.value = ViewState.Loading viewModelScope.launch(Dispatchers.Main) { val result = lookupInteractor(query, countryCode) when (result) { is Result.Success -> viewState.value = ViewState.SearchResults(result.entity) is Result.Failure -> viewState.value = ViewState.ErrorState(result.failure) } } } warning
// Lookup ViewModel UI (Fragment) ViewModel Interactor Repository Network Service @Inject val lookupInteractor: LookupInteractor private fun searchBusiness(query: String) GET: v2/registry/search T Tr Tra Tran Trans Transf Transfe Transfer Transferw Transferwi Transferwis Transferwise warning
// Lookup ViewModel UI (Fragment) ViewModel Interactor Repository Network Service private val сhannel = Channel<String>() init { viewModelScope.launch(Dispatchers.Main) { сhannel.consumeAsFlow().debounce(500L).collect { query -> searchBusiness(query) } } } fun search(query: String) { сhannel.offer(query) }
// Lookup ViewModel UI (Fragment) ViewModel Interactor Repository Network Service private val сhannel = Channel<String>() init { viewModelScope.launch(Dispatchers.Main) { сhannel.consumeAsFlow().debounce(500L).collect { query -> searchBusiness(query) } } } fun search(query: String) { сhannel.offer(query) } private suspend fun searchBusiness(query: String) { viewState.value = ViewState.Loading val result = lookupInteractor(query, countryCode) when (result) { is Result.Success -> viewState.value = ViewState.SearchResults(result.entity) is Result.Failure -> viewState.value = ViewState.ErrorState(result.failure) }
// Lookup Fragment UI (Fragment) ViewModel Interactor Repository Network Service searchView.addTextChangedListener { override fun afterTextChanged(s: Editable?) { viewModel.search(s.toString()) } } viewModel.viewState.observe(this, Observer { result -> return@Observer when (result) { is ViewState.Loading -> showLoading() is ViewState.SearchResults -> showResult(result.businessList) is ViewState.ErrorState -> showError(result.error) } })
// Links Official docs Kotlin Flows and Coroutines - Blog Post Coroutine Context and Scope - Blog Post Structured Concurrency - Blog Post Blocking threads, suspending coroutines - Blog Post Kotlin: Diving in to Coroutines and Channels - Blog Post

Structured concurrency with Kotlin Coroutines

  • 1.
  • 2.
    Vadims Savjolovs Product Engineer@ TransferWise @savjolovs@savjolovs @savjolovs
  • 3.
    // Agenda Theory ● Coroutines ●Suspending functions ● Asynchronous Flows ● Channels Practice Business lookup feature in TransferWise app for Android
  • 4.
    // What areKotlin Coroutines? Lightweight threads? Programming paradigm? New reactive framework? ✅ ✅ ✅
  • 5.
    fun main() { GlobalScope.launch{ delay(1000L) println("World!") } println("Hello,") Thread.sleep(2000L) } // Coroutine are like threads In a context of a CoroutineScope launch is a coroutine builder that launches a new coroutine Print “Hello, ” Keep JVM alive for 2 more seconds suspending function that does not block a thread, but suspends coroutine
  • 6.
    val counter =AtomicLong() for (i in 1..1_000_000L) { GlobalScope.launch { counter.incrementAndGet() } } println(counter) val counter = AtomicLong() for (i in 1..1_000_000L) { thread(start = true) { counter.incrementAndGet() } } println(counter) // Coroutines are cheap ~1.5 seconds~63 seconds
  • 7.
    // Suspending functions suspendfun getFromNetwork(): String GlobalScope.launch { val data = getSomeData() updateUi(data) } suspend fun getSomeData(): String { val someString = getFromNetwork() val id = getIdFromNetwork() val res = process(someString, id) return res } // Dart language Future<String> getSomeData() async { final someString = await getFromNetwork(); final id = await getIdFromNetwork(); final res = process(someString, id); return Future.value(res); } // Dart language getSomeData().then((data) { updateUi(data); });
  • 8.
    // Main-safety viewModelScope.launch(Dispatchers.Main) { valsomeString = getFromNetwork() liveData.value = someString } suspend fun getFromNetwork(): String { return URL("https://google.com").readText() } E/AndroidRuntime: FATAL EXCEPTION: main Process: com.transferwise.android android.os.NetworkOnMainThreadException Redundant ‘suspend’ modifier suspend fun getFromNetwork(): String { return withContext(Dispatchers.IO) { return@withContext URL("https://google.com").readText() } }
  • 9.
    GlobalScope { getFromNetwork() } {getIdFromNetwork() } myScope getSomeData() // Parallel decomposition suspend fun getSomeData(): String { val someString = GlobalScope.async { getFromNetwork() } val id = GlobalScope.async { getIdFromNetwork() } val res = process(someString.await(), id.await()) return res } warning job.cancel() val job = myScope.launch { val someData = getSomeData() } LEAK
  • 10.
    GlobalScope // Structured concurrency suspendfun getSomeData(): String { return coroutineScope { val someString = async { getFromNetwork() } val id = async { getIdFromNetwork() } val res = process(someString.await(), id.await()) return@coroutineScope res } } job.cancel() val job = myScope.launch { val someData = getSomeData() } myScope getSomeData() { getFromNetwork() } { getIdFromNetwork() }
  • 11.
    // Asynchronous Flow funnumberFlow(): Flow<Int> = flow { for (i in 1..100) { delay(1000) emit(i) } } myScope.launch { numberFlow() .filter { it % 2 == 0 } .map { it * 2 } .take(10) .collect { number -> println(number) } }
  • 12.
    // Channels myScope.launch { channel.send("1") delay(1000L) channel.send("2") } myScope.launch{ channel.consumeEach { println(it) } } coroutine 1 coroutine 2 send receive channel
  • 13.
    // What didwe covered? Coroutines ● Very lightweight threads ● Structured concurrency ● Start with launch{} or async{} ● Call suspending functions Suspending functions ● Not blocking thread ● Suspends a coroutine ● For a long-running tasks Asynchronous Flows ● Data streams ● Transformation operators Channels ● Communication tool for coroutines ● Hot data streams
  • 14.
  • 15.
    // Lookup Service UI(Fragment) ViewModel Interactor Repository Network Service @GET("v2/registry/search") suspend fun businessLookup( @Query("searchTerms") query: String, @Query("countryCode") countryCode: String ): NetworkResponse<List<LookupBusinessResponse>, ErrorsResponse>
  • 16.
    // Lookup Repository UI(Fragment) ViewModel Interactor Repository Network Service @Inject val lookupService: LookupService suspend fun businessLookup(query: String, countryCode: String) = withContext(Dispatchers.IO) { val response = lookupService.businessLookup(query, countryCode) when (response) { is NetworkResponse.Success -> return@withContext mapper.map(response) is NetworkResponse.Error -> return@withContext errorMapper.map(response) } }
  • 17.
    // Lookup Interactor UI(Fragment) ViewModel Interactor Repository Network Service @Inject val lookupRepository: LookupRepository suspend operator fun invoke( query: String, countryCode: String ): Result<List<LookupBusiness>, ErrorMessage> { return lookupRepository.businessLookup(query, countryCode) } lookupInteractor() instead of lookupInteractor.invoke()
  • 18.
    // Lookup ViewModel UI(Fragment) ViewModel Interactor Repository Network Service @Inject val lookupInteractor: LookupInteractor fun searchBusiness(query: String) { viewState.value = ViewState.Loading viewModelScope.launch(Dispatchers.Main) { val result = lookupInteractor(query, countryCode) when (result) { is Result.Success -> viewState.value = ViewState.SearchResults(result.entity) is Result.Failure -> viewState.value = ViewState.ErrorState(result.failure) } } } warning
  • 19.
    // Lookup ViewModel UI(Fragment) ViewModel Interactor Repository Network Service @Inject val lookupInteractor: LookupInteractor private fun searchBusiness(query: String) GET: v2/registry/search T Tr Tra Tran Trans Transf Transfe Transfer Transferw Transferwi Transferwis Transferwise warning
  • 20.
    // Lookup ViewModel UI(Fragment) ViewModel Interactor Repository Network Service private val сhannel = Channel<String>() init { viewModelScope.launch(Dispatchers.Main) { сhannel.consumeAsFlow().debounce(500L).collect { query -> searchBusiness(query) } } } fun search(query: String) { сhannel.offer(query) }
  • 21.
    // Lookup ViewModel UI(Fragment) ViewModel Interactor Repository Network Service private val сhannel = Channel<String>() init { viewModelScope.launch(Dispatchers.Main) { сhannel.consumeAsFlow().debounce(500L).collect { query -> searchBusiness(query) } } } fun search(query: String) { сhannel.offer(query) } private suspend fun searchBusiness(query: String) { viewState.value = ViewState.Loading val result = lookupInteractor(query, countryCode) when (result) { is Result.Success -> viewState.value = ViewState.SearchResults(result.entity) is Result.Failure -> viewState.value = ViewState.ErrorState(result.failure) }
  • 22.
    // Lookup Fragment UI(Fragment) ViewModel Interactor Repository Network Service searchView.addTextChangedListener { override fun afterTextChanged(s: Editable?) { viewModel.search(s.toString()) } } viewModel.viewState.observe(this, Observer { result -> return@Observer when (result) { is ViewState.Loading -> showLoading() is ViewState.SearchResults -> showResult(result.businessList) is ViewState.ErrorState -> showError(result.error) } })
  • 23.
    // Links Official docs KotlinFlows and Coroutines - Blog Post Coroutine Context and Scope - Blog Post Structured Concurrency - Blog Post Blocking threads, suspending coroutines - Blog Post Kotlin: Diving in to Coroutines and Channels - Blog Post