Why two different implementations?
In this article I will refer to Spring and Resilence4j.
If you are reading this article you probably already know about the bulkhead pattern, what is the problem that it intends to solve and the most common implementations: semaphore based and threadPool based.
At least for me, it was not easy to realize when to use a semaphore implementation and when to use a threadPool one.
I know how a Semaphore works, and I also understand the ThreadPool pattern, therefore a short answer and the most obvious could have been: use a threadPool for limiting the number of asynchronous calls and use a semaphore for limiting synchronous ones.
So why was the difficult part? The reason was these questions:
Why not just combine @Async with a bulkhead semaphore based implementation?
Can both annotations be used together? If yes, why is the reason for a threadPool implementation?
@Async and @Bulkhead combined.
@Bulkhead(name = "Service3", fallbackMethod = "futureFallback") @Async public CompletableFuture<String> doSomeWork() { System.out.println("Excecuting service 3 - " + Thread.currentThread().getName()); Util.mockExternalServiceHttpCall(DELAY); return CompletableFuture.completedFuture("ok"); }
Yes, you can use both annotations together. They allow you to generate a limited number of async calls based on your bulkhead semaphore configuration.
However, there are some implications that are useful to know.
SimpleAsyncTaskExecutor.
When you annotate a method with the @Async annotation, Spring can use different implementations of the TaskExecutor interface.
By default, the framework uses the SimpleAsyncTaslExecutor.
This implementation will create a new thread each time the annotated method is invoked; this thread will not be reused.
The problem with this approach is that you will create a new thread even if the semaphore counter is 0 (bulkhead is full)
As you can see in the following stacktrace, the framework first creates a thread and then calls the bulkhead pattern which determines if there are available permissions to continue the execution. If the semaphore counter is cero, the bulkhead will reject the method execution.
Thread [_simpleAsyncTask1] (Suspended (breakpoint at line 18 in Service3)) Service3.doSomeWork() line: 18 Service3$$FastClassBySpringCGLIB$$6085f5a4.invoke(int, Object, Object[]) line: not available MethodProxy.invoke(Object, Object[]) line: 218 CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 793 CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 163 CglibAopProxy$CglibMethodInvocation.proceed() line: 763 MethodInvocationProceedingJoinPoint.proceed() line: 89 BulkheadAspect.lambda$handleJoinPointCompletableFuture$0(ProceedingJoinPoint) line: 225 1457434357.get() line: not available Bulkhead.lambda$decorateCompletionStage$1(Bulkhead, Supplier) line: 100 1251257755.get() line: not available SemaphoreBulkhead(Bulkhead).executeCompletionStage(Supplier<CompletionStage<T>>) line: 557 BulkheadAspect.handleJoinPointCompletableFuture(ProceedingJoinPoint, Bulkhead) line: 223 BulkheadAspect.proceed(ProceedingJoinPoint, String, Bulkhead, Class<?>) line: 162 BulkheadAspect.lambda$bulkheadAroundAdvice$5eb13a26$1(ProceedingJoinPoint, String, Bulkhead, Class) line: 129 1746723773.apply() line: not available 1746723773(CheckedFunction0<R>).lambda$andThen$ca02ab3$1(CheckedFunction1) line: 265 1151489454.apply() line: not available BulkheadAspect.executeFallBack(ProceedingJoinPoint, String, Method, CheckedFunction0<Object>) line: 139 ==> here BulkheadAspect.bulkheadAroundAdvice(ProceedingJoinPoint, Bulkhead) line: 128 NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method] NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62 DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43 Method.invoke(Object, Object...) line: 566 AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethodWithGivenArgs(Object[]) line: 634 AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethod(JoinPoint, JoinPointMatch, Object, Throwable) line: 624 AspectJAroundAdvice.invoke(MethodInvocation) line: 72 CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 175 CglibAopProxy$CglibMethodInvocation.proceed() line: 763 ExposeInvocationInterceptor.invoke(MethodInvocation) line: 97 CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 186 CglibAopProxy$CglibMethodInvocation.proceed() line: 763 AnnotationAsyncExecutionInterceptor(AsyncExecutionInterceptor).lambda$invoke$0(MethodInvocation, Method) line: 115 1466446116.call() line: not available AsyncExecutionAspectSupport.lambda$doSubmit$3(Callable) line: 278 409592088.get() line: not available CompletableFuture$AsyncSupply<T>.run() line: 1700 ==> here SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run() line: 286 Thread.run() line: 829
After executing a load test that invokes for 60 seconds an @Aync and @Bulkhead annotated method, you can see from a profiling tool picture, that the application only used 34 out of 514 created threads. This obviously represents a waste of resources.
ThreadPoolTaskExecutor
Another option could be to use the TreadPoolTaskExecutor implementation.
After executing the same test using this implementation, the number of created threads decreased a lot (41).
However, the problem with this approach is that we are using unnecessary redundancy. Using a thread pool and a semaphore together, in my opinion, has no real advantage.
Conclusion.
For limiting asynchronous calls, use a Bulkhead threadPool implementation instead of a combination of @Async and Bulkhead semaphore.
Top comments (0)