Hello all. I’ve done my best to discover what is wrong, but to no avail. This problem is based on some work I’d done earlier and am convinced worked as expected. However, I am not able to reproduce the exception condition reliably, having tried back as far as kotlinx 0.14.1.
The following code demonstrates what I believe is either the issue or a hole in my understanding.
package coroutines import kotlinx.coroutines.experimental.CoroutineDispatcher import kotlinx.coroutines.experimental.asCoroutineDispatcher import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.experimental.AbstractCoroutineContextElement import kotlin.coroutines.experimental.Continuation import kotlin.coroutines.experimental.ContinuationInterceptor import kotlin.coroutines.experimental.CoroutineContext val dispatchExecutor: java.util.concurrent.ExecutorService = Executors.newFixedThreadPool(2) // the thread local I want to maintain val threadInt = ThreadLocal<Int>() val latch = CountDownLatch(1) fun main(args: Array<String>) { val result = runBlocking { val dispatcher: CoroutineDispatcher = dispatchExecutor.asCoroutineDispatcher() val intContext = InterceptingContext(dispatcher, latch) async(intContext) { println("Before exception") throw Exception("Whoops") println("After exception") 3 //arbitrary value to return from Deferred }.await() } latch.await(5, TimeUnit.SECONDS) println("Done $result") } class InterceptingContext(private val delegateInterceptor: ContinuationInterceptor, private val latch: CountDownLatch) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> { val wrapped = WrappedContinuation(continuation, latch, { println("In pre-block") }, { println("In post-block") }) return delegateInterceptor.interceptContinuation(wrapped) } } class WrappedContinuation<in T>( private val continuation: Continuation<T>, private val latch: CountDownLatch, private val preblock: () -> Unit, private val postblock: () -> Unit) : Continuation<T> { override val context: CoroutineContext get() = continuation.context override fun resume(value: T) { println("${Thread.currentThread().name} In resume") preblock() try { continuation.resume(value) } finally { postblock() } } override fun resumeWithException(exception: Throwable) { println("${Thread.currentThread().name} In resumeWithException") preblock() try { continuation.resumeWithException(exception) } finally { postblock() latch.countDown() } } } The point of the above code is to enable arbitrary execution of code before and after any resumption of a continuation. I’ve made two assumptions:
- My WrappedContinuation instance will intercept and delegate “resume(…)” (It does)
- if any exception occurs during execution of resume, the WrappedContinuation instance will intercept and delegate “resumeWIthException(…)” (it does not)
Am I missing something? I’ve assumed symmetry in this case, and my endeavour to debug through the kotlinx code has not left me any wiser, though I’ve seen plenty of invocations of resumeWithException as a result of the exception…just not on my wrapped continuation!
output:
pool-1-thread-1 @coroutine#2 In resume In pre-block Before exception In post-block Exception in thread "main" java.lang.Exception: Whoops at coroutines.TestExecutor2Kt$main$result$1$1.doResume(TestExecutor2.kt:29) at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:54) at coroutines.WrappedContinuation.resume(TestExecutor2.kt:68) at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:152) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight would be gratefully received.