Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public Publisher<T> first() {
.flatMap(batchCursor -> Mono.create(sink -> {
batchCursor.setBatchSize(1);
Mono.from(batchCursor.next())
.contextWrite(sink.contextView())
.doOnTerminate(batchCursor::close)
.doOnError(sink::error)
.doOnSuccess(results -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,25 @@ public void subscribe(final Subscriber<? super Void> s) {
sink.onCancel(() -> createCancellationMono(terminated).subscribe());

Consumer<Throwable> errorHandler = e -> createCancellationMono(terminated)
.contextWrite(sink.contextView())
.doOnError(i -> sink.error(e))
.doOnSuccess(i -> sink.error(e))
.subscribe();

Consumer<Long> saveFileDataMono = l -> createSaveFileDataMono(terminated, l)
.contextWrite(sink.contextView())
.doOnError(errorHandler)
.doOnSuccess(i -> sink.success())
.subscribe();

Consumer<Void> saveChunksMono = i -> createSaveChunksMono(terminated)
.contextWrite(sink.contextView())
.doOnError(errorHandler)
.doOnSuccess(saveFileDataMono)
.subscribe();

createCheckAndCreateIndexesMono()
.contextWrite(sink.contextView())
.doOnError(errorHandler)
.doOnSuccess(saveChunksMono)
.subscribe();
Expand Down Expand Up @@ -159,6 +163,7 @@ private Mono<Void> createCheckAndCreateIndexesMono() {
AtomicBoolean collectionExists = new AtomicBoolean(false);

return Mono.create(sink -> Mono.from(findPublisher.projection(PROJECTION).first())
.contextWrite(sink.contextView())
.subscribe(
d -> collectionExists.set(true),
sink::error,
Expand All @@ -167,6 +172,7 @@ private Mono<Void> createCheckAndCreateIndexesMono() {
sink.success();
} else {
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX)
.contextWrite(sink.contextView())
.doOnError(sink::error)
.doOnSuccess(i -> {
checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX)
Expand Down Expand Up @@ -227,6 +233,7 @@ private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated) {
AtomicLong lengthInBytes = new AtomicLong(0);
AtomicInteger chunkIndex = new AtomicInteger(0);
new ResizingByteBufferFlux(source, chunkSizeBytes)
.contextWrite(sink.contextView())
.flatMap((Function<ByteBuffer, Publisher<InsertOneResult>>) byteBuffer -> {
if (terminated.get()) {
return Mono.empty();
Expand Down