-
- Notifications
You must be signed in to change notification settings - Fork 459
Description
Problem Statement
I'm using libraries like below.
spring boot 3.5.6, sentry 8.21.1
io.sentry:sentry-spring-boot-starter-jakarta
io.sentry:sentry-log4j2
org.springframework.boot:spring-boot-starter-actuator(include io.micrometer)
pentelemetry-java-instrumentation 2.19.0
by adding agent like -javaagent:~/Documents/opentelemetry-javaagent.jar -XX:+EnableDynamicAgentLoading
(is there any difference between mode of importing dependency directly and adding jar with javaagent)
I'm using these combinations, and occasionally I query about 200,000 records from MongoDB.
Looking at the callTree, I see that the process io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) -> io.micrometer.context.DefaultContextSnapshotFactory.setThreadLocalsFrom(Object, String[]) -> io.sentry.Scope.(Scope) -> io.sentry.SynchronizedQueue.toArray(Object[]) -> java.util.concurrent.locks.ReentrantLock$Sync.lock() takes 500ms, and this builds up, ultimately causing a bottleneck of over 2 seconds.
I suspect that each reactor operator needs a lock to copy breadcrumbs when copying the scope, and this lock is causing contention between threads.
Setting maxBreadCrumbs=0 makes the method disappear from callTree, but causes a bottleneck in java.util.concurrent.CopyOnWriteArrayList.(Collection).
stackTrace is like below.
jdk.internal.misc.Unsafe.park(boolean, long) java.util.concurrent.locks.LockSupport.park(Object) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer$Node, int, boolean, boolean, boolean, long) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(int) java.util.concurrent.locks.ReentrantLock$Sync.lock() java.util.concurrent.locks.ReentrantLock.lock() # perhaps possible bottleneck io.sentry.util.AutoClosableReentrantLock.acquire() io.sentry.SynchronizedQueue.toArray(Object[]) io.sentry.Scope.<init>(Scope) io.sentry.Scope.clone() io.sentry.Scopes.forkedScopes(String) io.sentry.Sentry.getCurrentScopes() io.sentry.reactor.SentryReactorThreadLocalAccessor.getValue() io.sentry.reactor.SentryReactorThreadLocalAccessor.getValue() # scope object of sentry spec, its context retains at ThreadLocal, this methos is called every reactor operator runs io.micrometer.context.DefaultContextSnapshot.clearThreadLocal(Object, ThreadLocalAccessor, Map) io.micrometer.context.DefaultContextSnapshotFactory.setAllThreadLocalsFrom(Object, ContextRegistry, boolean) io.micrometer.context.DefaultContextSnapshotFactory.setThreadLocalsFrom(Object, String[]) reactor.core.publisher.ContextPropagation.setThreadLocals(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.onNext(ParallelMergeSequential$MergeSequentialInner, Object) reactor.core.publisher.ParallelMergeSequential$MergeSequentialInner.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync() reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run() io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run() io.micrometer.context.ContextSnapshot.lambda$wrap$0(Runnable) reactor.core.scheduler.WorkerTask.call() reactor.core.scheduler.WorkerTask.call() java.util.concurrent.FutureTask.run() java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run() java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.runWith(Object, Runnable) java.lang.Thread.run()
stackTrace when maxBreadCrumbs=0
java.lang.Object.getClass() java.util.concurrent.CopyOnWriteArrayList.<init>(Collection) // perhaps, a possible bottleneck io.sentry.Scope.<init>(Scope) io.sentry.Scope.clone() io.sentry.Scopes.forkedScopes(String) io.sentry.Sentry.getCurrentScopes() io.sentry.reactor.SentryReactorThreadLocalAccessor.getValue() io.sentry.reactor.SentryReactorThreadLocalAccessor.getValue() io.micrometer.context.DefaultContextSnapshot.clearThreadLocal(Object, ThreadLocalAccessor, Map) io.micrometer.context.DefaultContextSnapshotFactory.setAllThreadLocalsFrom(Object, ContextRegistry, boolean) io.micrometer.context.DefaultContextSnapshotFactory.setThreadLocalsFrom(Object, String[]) reactor.core.publisher.ContextPropagation.setThreadLocals(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap$FlatMapInner, Object) reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable$FuseableContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(long) reactor.core.publisher.FluxIterable$IterableSubscription.request(long) reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable$FuseableContextWriteRestoringThreadLocalsSubscriber.request(long) reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(Subscription) reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable$FuseableContextWriteRestoringThreadLocalsSubscriber.onSubscribe(Subscription) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(Subscription) reactor.core.publisher.FluxIterable.subscribe(CoreSubscriber, Spliterator, boolean, Runnable) reactor.core.publisher.FluxIterable.subscribe(CoreSubscriber) reactor.core.publisher.Flux.subscribe(Subscriber) reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap$FlatMapInner, Object) reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.MonoZip$ZipCoordinator.signal() reactor.core.publisher.MonoZip$ZipInner.onNext(Object) reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable$FuseableContextWriteRestoringThreadLocalsSubscriber.onNext(Object) io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(Object) reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty() reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete() reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete() io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onComplete() reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete() reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete() io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onComplete() reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.drainLoop() reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.drain() reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.onComplete() reactor.core.publisher.ParallelMergeSequential$MergeSequentialInner.onComplete() reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete() io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onComplete() reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(boolean, boolean, Subscriber, Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop() reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(Object) reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete() reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete() io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onComplete() reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.doComplete(Subscriber) reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.checkTerminated(boolean, boolean, Subscriber, Object) reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync() reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run() io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run() io.micrometer.context.ContextSnapshot.lambda$wrap$0(Runnable) reactor.core.scheduler.WorkerTask.call() reactor.core.scheduler.WorkerTask.call() java.util.concurrent.FutureTask.run() java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run() java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) java.util.concurrent.ThreadPoolExecutor$Worker.run() java.lang.Thread.runWith(Object, Runnable) java.lang.Thread.run()
it seems the possible bottleneck comes from io.sentry.SynchronizedQueue.toArray(Object[]), java.util.concurrent.CopyOnWriteArrayList.(Collection), especially while running FluxFlatMap
such as Flux.fromIterable(list).flatMap(repository::fetch).
because, flatMap(repository::fetch) will be run by multi threading, and scope from Flux.fromIterable(list) will be copied for every flatMap on each thread, though, thread will not be run parallelly by blocking method such as acquiring lock at SynchronizedQueue.toArray()
thanks a lot sincerely
Solution Brainstorm
No response
Metadata
Metadata
Assignees
Labels
Projects
Status