Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,26 +357,41 @@ protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager
Mono<Void> afterCleanup = Mono.empty();

if (txObject.isMustRestoreAutoCommit()) {
afterCleanup = afterCleanup.then(Mono.from(con.setAutoCommit(true)));
Mono<Void> restoreAutoCommitStep = safeCleanupStep(
"doCleanupAfterCompletion when restoring autocommit", Mono.from(con.setAutoCommit(true)));
afterCleanup = afterCleanup.then(restoreAutoCommitStep);
}

return afterCleanup.then(Mono.defer(() -> {
Mono<Void> releaseConnectionStep = Mono.defer(() -> {
try {
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
}
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
return safeCleanupStep("doCleanupAfterCompletion when releasing R2DBC Connection",
ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()));
}
}
finally {
txObject.getConnectionHolder().clear();
}
return Mono.empty();
}));
});
return afterCleanup.then(releaseConnectionStep);
});
}

private Mono<Void> safeCleanupStep(String stepDescription, Mono<Void> stepMono) {
if (!logger.isDebugEnabled()) {
return stepMono.onErrorComplete();
}
else {
return stepMono.doOnError(e ->
logger.debug(String.format("Error ignored during %s: %s", stepDescription, e)))
.onErrorComplete();
}
}

private Mono<Void> switchAutoCommitIfNecessary(Connection con, Object transaction) {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
Mono<Void> prepare = Mono.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.Statement;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.r2dbc.BadSqlGrammarException;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.TransactionDefinition;
Expand Down Expand Up @@ -326,6 +329,34 @@ void testRollbackFails() {
verifyNoMoreInteractions(connectionMock);
}

@Test
@SuppressWarnings("unchecked")
void testConnectionReleasedWhenRollbackFails() {
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty());

TransactionalOperator operator = TransactionalOperator.create(tm);

when(connectionMock.isAutoCommit()).thenReturn(true);
when(connectionMock.setAutoCommit(true)).thenReturn(Mono.defer(() -> Mono.error(new R2dbcTimeoutException("SET AUTOCOMMIT = 1 timed out"))));
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
when(connectionMock.setAutoCommit(false)).thenReturn(Mono.empty());

operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> {
throw new IllegalStateException("Intentional error to trigger rollback");
}).then()).as(StepVerifier::create)
.verifyErrorSatisfies(e -> Assertions.assertThat(e)
.isInstanceOf(BadSqlGrammarException.class)
.hasCause(new R2dbcBadGrammarException("Rollback should fail"))
);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock, never()).commitTransaction();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
}

@Test
void testTransactionSetRollbackOnly() {
when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
Expand Down