[Coroutines] resumeWithException not working as expected with custom continuation

Hello all. I’ve done my best to discover what is wrong, but to no avail. This problem is based on some work I’d done earlier and am convinced worked as expected. However, I am not able to reproduce the exception condition reliably, having tried back as far as kotlinx 0.14.1.

The following code demonstrates what I believe is either the issue or a hole in my understanding.

package coroutines import kotlinx.coroutines.experimental.CoroutineDispatcher import kotlinx.coroutines.experimental.asCoroutineDispatcher import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.experimental.AbstractCoroutineContextElement import kotlin.coroutines.experimental.Continuation import kotlin.coroutines.experimental.ContinuationInterceptor import kotlin.coroutines.experimental.CoroutineContext val dispatchExecutor: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2) // the thread local I want to maintain val threadInt = ThreadLocal<Int>() val latch = CountDownLatch(1) fun main(args: Array<String>) { val result = runBlocking { val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher() val intContext = InterceptingContext(dispatcher, latch) async(intContext) { println("Before exception") throw Exception("Whoops") println("After exception") 3 //arbitrary value to return from Deferred }.await() } latch.await(5, TimeUnit.SECONDS) println("Done $result") } class InterceptingContext(private val delegateInterceptor: ContinuationInterceptor, private val latch: CountDownLatch) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> { val wrapped = WrappedContinuation(continuation, latch, { println("In pre-block") }, { println("In post-block") }) return delegateInterceptor.interceptContinuation(wrapped) } } class WrappedContinuation<in T>( private val continuation: Continuation<T>, private val latch: CountDownLatch, private val preblock: () -> Unit, private val postblock: () -> Unit) : Continuation<T> { override val context: CoroutineContext get() = continuation.context override fun resume(value: T) { println("${Thread.currentThread().name} In resume") preblock() try { continuation.resume(value) } finally { postblock() } } override fun resumeWithException(exception: Throwable) { println("${Thread.currentThread().name} In resumeWithException") preblock() try { continuation.resumeWithException(exception) } finally { postblock() latch.countDown() } } } 

The point of the above code is to enable arbitrary execution of code before and after any resumption of a continuation. I’ve made two assumptions:

  1. My WrappedContinuation instance will intercept and delegate “resume(…)” (It does)
  2. if any exception occurs during execution of resume, the WrappedContinuation instance will intercept and delegate “resumeWIthException(…)” (it does not)

Am I missing something? I’ve assumed symmetry in this case, and my endeavour to debug through the kotlinx code has not left me any wiser, though I’ve seen plenty of invocations of resumeWithException as a result of the exception…just not on my wrapped continuation!

output:

pool-1-thread-1 @coroutine#2 In resume In pre-block Before exception In post-block Exception in thread "main" java.lang.Exception: Whoops at coroutines.TestExecutor2Kt$main$result$1$1.doResume(TestExecutor2.kt:29) at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54) at coroutines.WrappedContinuation.resume(TestExecutor2.kt:68) at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:152) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 

Any insight would be gratefully received.

Can you, please, explain what is not working? E.g. what results from running this code you’ve expected and what are the actual results you are getting?

I’m running this code and I’m not seeing any anomalies. The coroutine that you have in this code has a single continuation – from the start of the coroutine to its completion with the throw (it does not have suspensions so the initial continuation is all it has), so I’m seeing one interception occurring, printing:

In pre-block // before continuation (starting coroutine) Before exception // execution code in the coroutine In post-block // after continuation (completing coroutine) 

In my enthusiasm for using coroutines, I made some assumptions about how and when resumeWithException(…) would be called.

Assumption 1: If an exception is thrown in the coroutine after “onResume”, the same coroutine context would have resumeWithException(…) raised as an exception “callback” of sorts.

– the above appears to be wrong, given the test code already provided in this post. I think I also understand why my assumption was wrong.

Assumption 2: If coroutine B suspends on coroutine A, and coroutine A throws an Exception, then coroutine B would have “resumeWithException” raised.

so I tried a different test, based on the one above.

package coroutines import kotlinx.coroutines.experimental.CoroutineDispatcher import kotlinx.coroutines.experimental.Deferred import kotlinx.coroutines.experimental.asCoroutineDispatcher import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.runBlocking import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException import kotlin.coroutines.experimental.AbstractCoroutineContextElement import kotlin.coroutines.experimental.Continuation import kotlin.coroutines.experimental.ContinuationInterceptor import kotlin.coroutines.experimental.CoroutineContext //just an coroutines.getExecutor with two threads and a blocking backing queue val dispatchExecutor1: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2) val dispatchExecutor2: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2) // the thread local I want to maintain fun main(args: Array<String>) { val dispatcher1: CoroutineDispatcher = dispatchExecutor1.asCoroutineDispatcher() val dispatcher2: CoroutineDispatcher = dispatchExecutor2.asCoroutineDispatcher() val deferred1: Deferred<Int> = dispatch(dispatcher1) { println("Before exception") // delay(5000) throw Exception("Whoops") } val deferred2: Deferred<Int> = dispatch(dispatcher2) { val result1 = deferred1.await() result1 + 1 } runBlocking { val finalResult = deferred2.await() println("Done $finalResult") } dispatchExecutor1.shutdownNow() dispatchExecutor2.shutdownNow() } class InterceptingContext(private val delegateInterceptor: ContinuationInterceptor) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> { val wrapped = WrappedContinuation(continuation, { println("In pre-block ${Thread.currentThread().name}") }, { println("In post-block ${Thread.currentThread().name}") }) return delegateInterceptor.interceptContinuation(wrapped) } } class WrappedContinuation<in T>( private val continuation: Continuation<T>, private val preblock: () -> Unit, private val postblock: () -> Unit) : Continuation<T> { override val context: CoroutineContext get() = continuation.context override fun resume(value: T) { println("${Thread.currentThread().name} In resume") preblock() try { continuation.resume(value) } finally { postblock() } } override fun resumeWithException(exception: Throwable) { println("${Thread.currentThread().name} In resumeWithException") preblock() try { continuation.resumeWithException(exception) } finally { postblock() } } } private fun <R> dispatch(dispatcher: CoroutineDispatcher, block: suspend () -> R): Deferred<R> { return try { async(InterceptingContext(dispatcher)) { block() } } catch (ree: RejectedExecutionException) { ree.printStackTrace() throw ree } } 

Main difference is that there are now two coroutines instead of one, and the second waits on the first. And here’s where I am confused again. If I uncomment the “delay(5000)” line in coroutine 1, then I do, in fact get resumeWithException raised in coroutine 2.

pool-1-thread-1 @coroutine#1 In resume pool-2-thread-1 @coroutine#2 In resume In pre-block pool-2-thread-1 @coroutine#2 In pre-block pool-1-thread-1 @coroutine#1 Before exception In post-block pool-2-thread-1 @coroutine#2 In post-block pool-1-thread-1 @coroutine#1 pool-1-thread-2 @coroutine#1 In resume In pre-block pool-1-thread-2 @coroutine#1 In post-block pool-1-thread-2 @coroutine#1 pool-2-thread-2 @coroutine#2 In resumeWithException In pre-block pool-2-thread-2 @coroutine#2 In post-block pool-2-thread-2 @coroutine#2 Exception in thread "main" java.lang.Exception: Whoops at coroutines.TestExecutor2Kt$main$deferred1$1.doResume(TestExecutor2.kt:28) at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54) at coroutines.WrappedContinuation.resume(TestExecutor2.kt:71) at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:152) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 

But if I leave the delay commented out, then resumeWithException never gets called…the exception just percolates out of the main method. This implies to me some timing issue in getting the resumeWithException invocation, in that coroutine1 failed before coroutine2 was even waiting for it, and thus the exception was “lost”.

My biggest hope here is that there is clear best practice on capturing exceptions in pipelines of coroutines. The only solution I can see to get exception logging closer to the exception detection is to wrap the call to “block()” in a try catch, and log there.

Without delay, the exception is thrown by the deferred1 coroutine so fast, that the deferred2 coroutine does not have chance to suspend. It just continues execution with exception. However, with delay in the deferred1, the second coroutine suspends, and then resumes with exception.

OK, I understand now. Thank you for taking the time to answer.

@elizarov when can we expect coroutines to be finalized? I am really excited to play with them.

They will be finalized when we receive enough feedback to have a confidence in all the details of their design. Meanwhile, we’ve released them in Kotlin 1.1, so that you can start using them. See here for details Can "experimental" Kotlin coroutines be used in production? - Stack Overflow

A post was split to a new topic: Functions with generic return typ