ASYNC CODE ON KOTLIN: RXJAVA OR/AND COROUTINES Fabio Collini
@fabioCollini linkedin.com/in/fabiocollini github.com/fabioCollini medium.com/@fabioCollini codingjam.it Android programmazione avanzata Android Developers Italia Ego slide
RxJava Coroutines OR
RxJava Coroutines AND
RxJava Coroutines-> RxJavaCoroutines -> rxCompletable, rxMaybe, rxSingle, rxObservable, rxFlowable CompletableSource.await, MaybeSource.await, MaybeSource.awaitOrDefault, MaybeSource.openSubscription, SingleSource.await, ObservableSource.awaitFirst, ObservableSource.awaitFirstOrDefault, ObservableSource.awaitFirstOrElse, ObservableSource.awaitFirstOrNull, ObservableSource.awaitLast, ObservableSource.awaitSingle, ObservableSource.openSubscription, ObservableSource.iterator github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-rx2
Single Maybe Completable Observable Flowable Cold Vs Hot Subjects suspend methods Job/Deferred Channels Actors Producers Publishers BroadcastChannels
Wrapper of a synchronous method fun myPref(): String = prefs.getString("myPref", "") fun myPref(): Single<String> = Single.fromCallable { prefs.getString("myPref", "") }1 suspend fun myPref(): String = withContext(CommonPool) { prefs.getString("myPref", "") }2 RxJavaCoroutines
Thread & error management fun myPref(): Single<String> = Single.fromCallable { prefs.getString("myPref", "") }1 suspend fun myPref(): String = withContext(CommonPool) { prefs.getString("myPref", "") }2 myPref() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { myText.text = it }, { e -> myText.text = e.message } ) launch(CommonPool) { try { val value = myPref() withContext(UI) { myText.text = value } } catch (e: Exception) { withContext(UI) { myText.text = value } } } RxJava Coroutines
Wrapper of an asynchronous method fun View.scale(v: Float): ViewPropertyAnimator { return animate() .scaleX(v) .scaleY(v) }4 text.setOnClickListener { text.scale(2f).withEndAction { text.scale(1f) }1 }2
Wrapper of an asynchronous method fun View.scale(value: Float): Completable = Completable.create { emitter -> val animator = animate() animator.scaleX(value).scaleY(value).withEndAction { emitter.onComplete() }3 }4 text.setOnClickListener { text.scale(2f) .andThen(text.scale(1f)) .subscribe() }2 RxJava
Wrapper of an asynchronous method fun View.scale(value: Float): Completable = Completable.create { emitter -> val animator = animate() animator.scaleX(value).scaleY(value).withEndAction { emitter.setCancellable(null) emitter.onComplete() } emitter.setCancellable { animator.cancel() }3 }4 RxJava
Wrapper of an asynchronous method suspend fun View.scale(v: Float) = suspendCoroutine<Unit> { continuation -> animate() .scaleX(v) .scaleY(v) .withEndAction { continuation.resume(Unit) } }1 text.setOnClickListener { launch(UI) { text.scale(2f) text.scale(1f) } } Coroutines
Wrapper of an asynchronous method suspend fun View.scale(v: Float) = suspendCancellableCoroutine<Unit> { continuation -> val animator = animate() animator .scaleX(v) .scaleY(v) .withEndAction { continuation.resume(Unit) } continuation.invokeOnCancellation { animator.cancel() } }1 Coroutines
suspend fun View.scale(v: Float) = suspendCoroutine<Unit> { continuation -> animate() .scaleX(v) .scaleY(v) .withEndAction { continuation.resume(Unit) }3 }4 launch(UI) { text.scale(2f) println(“Where am I?”) }5 fun View.scale(value: Float): Completable = Completable.create { emitter -> animate() .scaleX(value) .scaleY(value) .withEndAction_{ emitter.onComplete() }1 }2 text.scale(2f) .doOnComplete { println("Where am I?") } .subscribe() Threading RxJava Coroutines
suspend fun View.scale(v: Float) = suspendCoroutine<Unit> { continuation -> animate() .scaleX(v) .scaleY(v) .withEndAction { thread { continuation.resume(Unit) } }3 }4 launch(UI) { text.scale(2f) println(“Where am I?”) }5 fun View.scale(value: Float): Completable = Completable.create { emitter -> animate() .scaleX(value) .scaleY(value) .withEndAction_{ thread_{ emitter.onComplete() }_ }1 }2 text.scale(2f) .doOnComplete { println("Where am I?") } .subscribe() Threading RxJava Coroutines
fun View.scale(value: Float): Completable = Completable.create { emitter -> animate() .scaleX(value) .scaleY(value) .withEndAction_{ thread_{ emitter.onComplete() }_ }1 }2 text.scale(2f) .observeOn(AndroidSchedulers.mainThread()) .doOnComplete { println("Where am I?") } .subscribe() Threading RxJava
interface StackOverflowService { @GET("/users") fun getTopUsers(): Single<List<User>> @GET("/users/{userId}/badges") fun getBadges( @Path("userId") userId: Int ): Single<List<Badge>> @GET("/users/{userId}/top-tags") fun getTags( @Path("userId") userId: Int ): Single<List<Tag>> }0 interface StackOverflowService { @GET("/users") fun getTopUsers(): Deferred<List<User>> @GET("/users/{userId}/badges") fun getBadges( @Path("userId") userId: Int ): Deferred<List<Badge>> @GET("/users/{userId}/top-tags") fun getTags( @Path("userId") userId: Int ): Deferred<List<Tag>> }1 RxJava Coroutines
class MyViewModel( private val service: StackOverflowService ) : ViewModel() { private val disposable = CompositeDisposable() fun load() { disposable += service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 }A private fun updateUi(s: Any) { //... }B override fun onCleared() { disposable.clear() }C }D class MyViewModel( private val service: StackOverflowService ) : ViewModel() { private val job = Job() fun load() { launch(CommonPool + job) { try { val users = service.getTopUsers().await() updateUi(users) } catch (e: Exception) { updateUi(e) }4 }5 }E private suspend fun updateUi(s: Any) { withContext(UI) { //... }F }1 override fun onCleared() { job.cancel() }G }H RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() updateUi(users) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .map_{_it.first()_}_ .flatMap_{_firstUser_->_ service.getBadges(firstUser.id) }0 .subscribeOn(io()) .observeOn(mainThread()) .subscribe( {_badges_->_updateUi(badges)_}, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val firstUser = users.first() val badges = service.getBadges( firstUser.id).await() updateUi(badges) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
updateUi("loading users") service.getTopUsers() .observeOn(mainThread()) .doOnSuccess { updateUi("loading badges") } .observeOn(io()) .map_{_it.first()_}_ .flatMap_{_firstUser_->_ service.getBadges(firstUser.id) }0 .subscribeOn(io()) .observeOn(mainThread()) .subscribe( {_badges_->_updateUi(badges)_}, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { updateUi("loading users") val users = service.getTopUsers().await() updateUi("loading badges") val firstUser = users.first() val badges = service.getBadges( firstUser.id).await() updateUi(badges) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .map_{_it.first()_}_ .flatMap_{_firstUser_->_ service.getBadges(firstUser.id) .map { badges -> UserStats(firstUser, badges) }p }0 .subscribeOn(io()) .observeOn(mainThread()) .subscribe( {_user_->_updateUi(user)_}, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val firstUser = users.first() val badges = service.getBadges( firstUser.id).await() val user = UserStats(firstUser, badges) updateUi(user) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .flattenAsObservable { it.take(5) } .concatMapEager { user -> service.getBadges(user.id) .map { badges -> UserStats(user, badges) }p .toObservable() }0 .toList() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { usersWithBadges -> updateUi(usersWithBadges) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val usersWithBadges = users.take(5) .map { it to service.getBadges(it.id) } .map { (user, badges) -> UserStats(user, badges.await()) } updateUi(usersWithBadges)1 } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .flattenAsObservable { it.take(5) } .concatMapEager { user -> service.getBadges(user.id) .map { badges -> UserStats(user, badges) }p .toObservable() }0 .toList() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { usersWithBadges -> updateUi(usersWithBadges) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val usersWithBadges: List<UserStats> = users.take(5) .map { user -> async(coroutineContext) { val badges = service.getBadges(user.id) UserStats(user, badges.await()) }r }t .map { it.await() } updateUi(usersWithBadges)1 } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .flattenAsObservable { it.take(5) } .concatMapEager { user -> userDetail(user).toObservable() }0 .toList() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 fun userDetail(user: User): Single<UserStats> { return service.getBadges(user.id) .map { badges -> UserStats(user, badges) }7 }8 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val usersWithBadges: List<UserStats> = users.take(5) .map { user -> async(coroutineContext) { userDetail(user) }r }t .map { it.await() } updateUi(usersWithBadges)1 } catch (e: Exception) { updateUi(e) }4 }5 suspend fun userDetail(user: User): UserStats { val badges = service.getBadges(user.id) return UserStats(user, badges.await()) }9 RxJava Coroutines
fun userDetail(user: User): Single<UserStats> { return service.getBadges(user.id) .map { badges -> UserStats(user, badges) }7 }8 suspend fun userDetail(user: User): UserStats { val badges = service.getBadges(user.id) return UserStats(user, badges.await()) }9 RxJava Coroutines
fun userDetail(user: User): Single<UserStats> { return Singles.zip( service.getBadges(user.id).subscribeOn(io()), service.getTags(user.id).subscribeOn(io()), { badges, tags -> UserStats(user, badges, tags) } ) }8 suspend fun userDetail(it: User): UserStats { val badges = service.getBadges(it.id) val tags = service.getTags(it.id) return UserStats(it, badges.await(), tags.await()) }9 RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() updateUi(users) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retry(3) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { retry(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retry(3) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { retry(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 suspend fun <T> retry(attempts: Int = 5, f: suspend () -> T): T { repeat(attempts - 1) { try { return f() } catch (e: Exception) { } } return f() } RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retryWhen(ExponentialBackoff(3)) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { exponentialBackoff(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retryWhen(ExponentialBackoff(3)) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 class ExponentialBackoff(private val maxRetries: Int) : Function1<Flowable<out Throwable>, Publisher<*>> { private var retryCount = 0 private var currentDelay = 100L + Random().nextInt(100) override fun invoke(attempts: Flowable<out Throwable>): Publisher<*> { return attempts .flatMap { throwable -> if (++retryCount < maxRetries) Flowable.timer(currentDelay, MILLISECONDS).also { currentDelay *= 2 } else Flowable.error(throwable) } } } launch(CommonPool + job) { try { exponentialBackoff(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 suspend fun <T> exponentialBackoff( times: Int = 5, f: suspend () -> T): T { var currentDelay = 100 + Random().nextInt(100) repeat(times - 1) { try { return f() } catch (e: Exception) { } delay(currentDelay) currentDelay *= 2 } return f() } RxJava Coroutines
Debugging RxJava Coroutines
Testing RxJava JVM Trampoline scheduler blockingGet TestScheduler and TestObserver Espresso Schedulers.from(AsyncTask.THREAD_POOL_EXECUTOR) Coroutines JVM runBlocking Espresso AsyncTask.THREAD_POOL_EXECUTOR.asCoroutineDispatcher()
11051 io.reactivex 144 kotlin.coroutines 1984 kotlinx.coroutines Method count - debug
427 io.reactivex 77 kotlin.coroutines 493 kotlinx.coroutines Method count - release
Links Demo Project github.com/fabioCollini/RxJavaVsCoroutines Guide to kotlinx.coroutines by example github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md Kotlin coroutines, a deeper look medium.com/@elizarov/kotlin-coroutines-a-deeper-look-180536305c3f Async code using Kotlin Coroutines proandroiddev.com/async-code-using-kotlin-coroutines-233d201099ff Testing asynchronous RxJava code using Mockito medium.com/@fabioCollini/testing-asynchronous-rxjava-code-using-mockito-8ad831a16877
THANKS FOR YOUR ATTENTION
QUESTIONS? Android Developers Italia androiddevs.it

Async code on kotlin: rx java or/and coroutines - Kotlin Night Turin

  • 1.
    ASYNC CODE ONKOTLIN: RXJAVA OR/AND COROUTINES Fabio Collini
  • 2.
  • 5.
  • 6.
  • 7.
    RxJava Coroutines-> RxJavaCoroutines -> rxCompletable,rxMaybe, rxSingle, rxObservable, rxFlowable CompletableSource.await, MaybeSource.await, MaybeSource.awaitOrDefault, MaybeSource.openSubscription, SingleSource.await, ObservableSource.awaitFirst, ObservableSource.awaitFirstOrDefault, ObservableSource.awaitFirstOrElse, ObservableSource.awaitFirstOrNull, ObservableSource.awaitLast, ObservableSource.awaitSingle, ObservableSource.openSubscription, ObservableSource.iterator github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-rx2
  • 8.
    Single Maybe Completable Observable Flowable Cold Vs Hot Subjects suspendmethods Job/Deferred Channels Actors Producers Publishers BroadcastChannels
  • 9.
    Wrapper of asynchronous method fun myPref(): String = prefs.getString("myPref", "") fun myPref(): Single<String> = Single.fromCallable { prefs.getString("myPref", "") }1 suspend fun myPref(): String = withContext(CommonPool) { prefs.getString("myPref", "") }2 RxJavaCoroutines
  • 10.
    Thread & errormanagement fun myPref(): Single<String> = Single.fromCallable { prefs.getString("myPref", "") }1 suspend fun myPref(): String = withContext(CommonPool) { prefs.getString("myPref", "") }2 myPref() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { myText.text = it }, { e -> myText.text = e.message } ) launch(CommonPool) { try { val value = myPref() withContext(UI) { myText.text = value } } catch (e: Exception) { withContext(UI) { myText.text = value } } } RxJava Coroutines
  • 11.
    Wrapper of anasynchronous method fun View.scale(v: Float): ViewPropertyAnimator { return animate() .scaleX(v) .scaleY(v) }4 text.setOnClickListener { text.scale(2f).withEndAction { text.scale(1f) }1 }2
  • 12.
    Wrapper of anasynchronous method fun View.scale(value: Float): Completable = Completable.create { emitter -> val animator = animate() animator.scaleX(value).scaleY(value).withEndAction { emitter.onComplete() }3 }4 text.setOnClickListener { text.scale(2f) .andThen(text.scale(1f)) .subscribe() }2 RxJava
  • 13.
    Wrapper of anasynchronous method fun View.scale(value: Float): Completable = Completable.create { emitter -> val animator = animate() animator.scaleX(value).scaleY(value).withEndAction { emitter.setCancellable(null) emitter.onComplete() } emitter.setCancellable { animator.cancel() }3 }4 RxJava
  • 14.
    Wrapper of anasynchronous method suspend fun View.scale(v: Float) = suspendCoroutine<Unit> { continuation -> animate() .scaleX(v) .scaleY(v) .withEndAction { continuation.resume(Unit) } }1 text.setOnClickListener { launch(UI) { text.scale(2f) text.scale(1f) } } Coroutines
  • 15.
    Wrapper of anasynchronous method suspend fun View.scale(v: Float) = suspendCancellableCoroutine<Unit> { continuation -> val animator = animate() animator .scaleX(v) .scaleY(v) .withEndAction { continuation.resume(Unit) } continuation.invokeOnCancellation { animator.cancel() } }1 Coroutines
  • 16.
    suspend fun View.scale(v:Float) = suspendCoroutine<Unit> { continuation -> animate() .scaleX(v) .scaleY(v) .withEndAction { continuation.resume(Unit) }3 }4 launch(UI) { text.scale(2f) println(“Where am I?”) }5 fun View.scale(value: Float): Completable = Completable.create { emitter -> animate() .scaleX(value) .scaleY(value) .withEndAction_{ emitter.onComplete() }1 }2 text.scale(2f) .doOnComplete { println("Where am I?") } .subscribe() Threading RxJava Coroutines
  • 17.
    suspend fun View.scale(v:Float) = suspendCoroutine<Unit> { continuation -> animate() .scaleX(v) .scaleY(v) .withEndAction { thread { continuation.resume(Unit) } }3 }4 launch(UI) { text.scale(2f) println(“Where am I?”) }5 fun View.scale(value: Float): Completable = Completable.create { emitter -> animate() .scaleX(value) .scaleY(value) .withEndAction_{ thread_{ emitter.onComplete() }_ }1 }2 text.scale(2f) .doOnComplete { println("Where am I?") } .subscribe() Threading RxJava Coroutines
  • 18.
    fun View.scale(value: Float):Completable = Completable.create { emitter -> animate() .scaleX(value) .scaleY(value) .withEndAction_{ thread_{ emitter.onComplete() }_ }1 }2 text.scale(2f) .observeOn(AndroidSchedulers.mainThread()) .doOnComplete { println("Where am I?") } .subscribe() Threading RxJava
  • 19.
    interface StackOverflowService { @GET("/users") fungetTopUsers(): Single<List<User>> @GET("/users/{userId}/badges") fun getBadges( @Path("userId") userId: Int ): Single<List<Badge>> @GET("/users/{userId}/top-tags") fun getTags( @Path("userId") userId: Int ): Single<List<Tag>> }0 interface StackOverflowService { @GET("/users") fun getTopUsers(): Deferred<List<User>> @GET("/users/{userId}/badges") fun getBadges( @Path("userId") userId: Int ): Deferred<List<Badge>> @GET("/users/{userId}/top-tags") fun getTags( @Path("userId") userId: Int ): Deferred<List<Tag>> }1 RxJava Coroutines
  • 20.
    class MyViewModel( private valservice: StackOverflowService ) : ViewModel() { private val disposable = CompositeDisposable() fun load() { disposable += service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 }A private fun updateUi(s: Any) { //... }B override fun onCleared() { disposable.clear() }C }D class MyViewModel( private val service: StackOverflowService ) : ViewModel() { private val job = Job() fun load() { launch(CommonPool + job) { try { val users = service.getTopUsers().await() updateUi(users) } catch (e: Exception) { updateUi(e) }4 }5 }E private suspend fun updateUi(s: Any) { withContext(UI) { //... }F }1 override fun onCleared() { job.cancel() }G }H RxJava Coroutines
  • 21.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users ->updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() updateUi(users) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 22.
    service.getTopUsers() .map_{_it.first()_}_ .flatMap_{_firstUser_->_ service.getBadges(firstUser.id) }0 .subscribeOn(io()) .observeOn(mainThread()) .subscribe( {_badges_->_updateUi(badges)_}, { e ->updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val firstUser = users.first() val badges = service.getBadges( firstUser.id).await() updateUi(badges) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 23.
    updateUi("loading users") service.getTopUsers() .observeOn(mainThread()) .doOnSuccess {updateUi("loading badges") } .observeOn(io()) .map_{_it.first()_}_ .flatMap_{_firstUser_->_ service.getBadges(firstUser.id) }0 .subscribeOn(io()) .observeOn(mainThread()) .subscribe( {_badges_->_updateUi(badges)_}, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { updateUi("loading users") val users = service.getTopUsers().await() updateUi("loading badges") val firstUser = users.first() val badges = service.getBadges( firstUser.id).await() updateUi(badges) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 24.
    service.getTopUsers() .map_{_it.first()_}_ .flatMap_{_firstUser_->_ service.getBadges(firstUser.id) .map { badges-> UserStats(firstUser, badges) }p }0 .subscribeOn(io()) .observeOn(mainThread()) .subscribe( {_user_->_updateUi(user)_}, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val firstUser = users.first() val badges = service.getBadges( firstUser.id).await() val user = UserStats(firstUser, badges) updateUi(user) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 25.
    service.getTopUsers() .flattenAsObservable { it.take(5)} .concatMapEager { user -> service.getBadges(user.id) .map { badges -> UserStats(user, badges) }p .toObservable() }0 .toList() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { usersWithBadges -> updateUi(usersWithBadges) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val usersWithBadges = users.take(5) .map { it to service.getBadges(it.id) } .map { (user, badges) -> UserStats(user, badges.await()) } updateUi(usersWithBadges)1 } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 26.
    service.getTopUsers() .flattenAsObservable { it.take(5)} .concatMapEager { user -> service.getBadges(user.id) .map { badges -> UserStats(user, badges) }p .toObservable() }0 .toList() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { usersWithBadges -> updateUi(usersWithBadges) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val usersWithBadges: List<UserStats> = users.take(5) .map { user -> async(coroutineContext) { val badges = service.getBadges(user.id) UserStats(user, badges.await()) }r }t .map { it.await() } updateUi(usersWithBadges)1 } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 27.
    service.getTopUsers() .flattenAsObservable { it.take(5)} .concatMapEager { user -> userDetail(user).toObservable() }0 .toList() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users -> updateUi(users) }, { e -> updateUi(e) } )1 fun userDetail(user: User): Single<UserStats> { return service.getBadges(user.id) .map { badges -> UserStats(user, badges) }7 }8 launch(CommonPool + job) { try { val users = service.getTopUsers().await() val usersWithBadges: List<UserStats> = users.take(5) .map { user -> async(coroutineContext) { userDetail(user) }r }t .map { it.await() } updateUi(usersWithBadges)1 } catch (e: Exception) { updateUi(e) }4 }5 suspend fun userDetail(user: User): UserStats { val badges = service.getBadges(user.id) return UserStats(user, badges.await()) }9 RxJava Coroutines
  • 28.
    fun userDetail(user: User):Single<UserStats> { return service.getBadges(user.id) .map { badges -> UserStats(user, badges) }7 }8 suspend fun userDetail(user: User): UserStats { val badges = service.getBadges(user.id) return UserStats(user, badges.await()) }9 RxJava Coroutines
  • 29.
    fun userDetail(user: User):Single<UserStats> { return Singles.zip( service.getBadges(user.id).subscribeOn(io()), service.getTags(user.id).subscribeOn(io()), { badges, tags -> UserStats(user, badges, tags) } ) }8 suspend fun userDetail(it: User): UserStats { val badges = service.getBadges(it.id) val tags = service.getTags(it.id) return UserStats(it, badges.await(), tags.await()) }9 RxJava Coroutines
  • 30.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .subscribe( { users ->updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { val users = service.getTopUsers().await() updateUi(users) } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 31.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .subscribe( { users-> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 } catch (e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 32.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retry(3) .subscribe( { users-> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { retry(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 33.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retry(3) .subscribe( { users-> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { retry(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 suspend fun <T> retry(attempts: Int = 5, f: suspend () -> T): T { repeat(attempts - 1) { try { return f() } catch (e: Exception) { } } return f() } RxJava Coroutines
  • 34.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retryWhen(ExponentialBackoff(3)) .subscribe( { users-> updateUi(users) }, { e -> updateUi(e) } )1 launch(CommonPool + job) { try { exponentialBackoff(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 RxJava Coroutines
  • 35.
    service.getTopUsers() .subscribeOn(io()) .observeOn(mainThread()) .timeout(10, SECONDS) .retryWhen(ExponentialBackoff(3)) .subscribe( { users-> updateUi(users) }, { e -> updateUi(e) } )1 class ExponentialBackoff(private val maxRetries: Int) : Function1<Flowable<out Throwable>, Publisher<*>> { private var retryCount = 0 private var currentDelay = 100L + Random().nextInt(100) override fun invoke(attempts: Flowable<out Throwable>): Publisher<*> { return attempts .flatMap { throwable -> if (++retryCount < maxRetries) Flowable.timer(currentDelay, MILLISECONDS).also { currentDelay *= 2 } else Flowable.error(throwable) } } } launch(CommonPool + job) { try { exponentialBackoff(3) { withTimeout(10, SECONDS) { val users = service.getTopUsers().await() updateUi(users) }6 }7 } catch_(e: Exception) { updateUi(e) }4 }5 suspend fun <T> exponentialBackoff( times: Int = 5, f: suspend () -> T): T { var currentDelay = 100 + Random().nextInt(100) repeat(times - 1) { try { return f() } catch (e: Exception) { } delay(currentDelay) currentDelay *= 2 } return f() } RxJava Coroutines
  • 36.
  • 37.
    Testing RxJava JVM Trampoline scheduler blockingGet TestScheduler andTestObserver Espresso Schedulers.from(AsyncTask.THREAD_POOL_EXECUTOR) Coroutines JVM runBlocking Espresso AsyncTask.THREAD_POOL_EXECUTOR.asCoroutineDispatcher()
  • 38.
    11051 io.reactivex 144 kotlin.coroutines 1984kotlinx.coroutines Method count - debug
  • 39.
    427 io.reactivex 77 kotlin.coroutines 493kotlinx.coroutines Method count - release
  • 40.
    Links Demo Project github.com/fabioCollini/RxJavaVsCoroutines Guide tokotlinx.coroutines by example github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md Kotlin coroutines, a deeper look medium.com/@elizarov/kotlin-coroutines-a-deeper-look-180536305c3f Async code using Kotlin Coroutines proandroiddev.com/async-code-using-kotlin-coroutines-233d201099ff Testing asynchronous RxJava code using Mockito medium.com/@fabioCollini/testing-asynchronous-rxjava-code-using-mockito-8ad831a16877
  • 41.
  • 42.