Skip to content

Commit 1fc1846

Browse files
committed
transaction: document, test, and fix operation of nested transactions
1 parent f7fd833 commit 1fc1846

File tree

2 files changed

+142
-15
lines changed

2 files changed

+142
-15
lines changed

sqlx-core/src/transaction.rs

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::describe::Describe;
99
use crate::executor::{Execute, Executor, RefExecutor};
1010
use crate::runtime::spawn;
1111

12+
/// Represents a database transaction.
1213
// Transaction<PoolConnection<PgConnection>>
1314
// Transaction<PgConnection>
1415
pub struct Transaction<T>
@@ -38,10 +39,14 @@ where
3839
})
3940
}
4041

41-
pub async fn begin(mut self) -> crate::Result<Transaction<T>> {
42-
Transaction::new(self.depth, self.inner.take().expect(ERR_FINALIZED)).await
42+
/// Creates a new save point in the current transaction and returns
43+
/// a new `Transaction` object to manage its scope.
44+
pub async fn begin(self) -> crate::Result<Transaction<Transaction<T>>> {
45+
Transaction::new(self.depth, self).await
4346
}
4447

48+
/// Commits the current transaction or save point.
49+
/// Returns the inner connection or transaction.
4550
pub async fn commit(mut self) -> crate::Result<T> {
4651
let mut inner = self.inner.take().expect(ERR_FINALIZED);
4752
let depth = self.depth;
@@ -57,6 +62,8 @@ where
5762
Ok(inner)
5863
}
5964

65+
/// Rollback the current transaction or save point.
66+
/// Returns the inner connection or transaction.
6067
pub async fn rollback(mut self) -> crate::Result<T> {
6168
let mut inner = self.inner.take().expect(ERR_FINALIZED);
6269
let depth = self.depth;
@@ -95,15 +102,49 @@ where
95102
}
96103
}
97104

98-
impl<'c, DB, T> Executor for &'c mut Transaction<T>
105+
impl<T> Connection for Transaction<T>
106+
where
107+
T: Connection,
108+
{
109+
// Close is equivalent to
110+
fn close(mut self) -> BoxFuture<'static, crate::Result<()>> {
111+
Box::pin(async move {
112+
let mut inner = self.inner.take().expect(ERR_FINALIZED);
113+
114+
if self.depth == 1 {
115+
// This is the root transaction, call rollback
116+
let res = inner.execute("ROLLBACK").await;
117+
118+
// No matter the result of the above, call close
119+
let _ = inner.close().await;
120+
121+
// Now raise the error if there was one
122+
res?;
123+
} else {
124+
// This is not the root transaction, forward to a nested
125+
// transaction (to eventually call rollback)
126+
inner.close().await?
127+
}
128+
129+
Ok(())
130+
})
131+
}
132+
133+
#[inline]
134+
fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>> {
135+
self.deref_mut().ping()
136+
}
137+
}
138+
139+
impl<DB, T> Executor for Transaction<T>
99140
where
100141
DB: Database,
101142
T: Connection<Database = DB>,
102143
{
103144
type Database = T::Database;
104145

105-
fn execute<'e, 'q: 'e, 't: 'e, E: 'e>(
106-
&'t mut self,
146+
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
147+
&'c mut self,
107148
query: E,
108149
) -> BoxFuture<'e, crate::Result<u64>>
109150
where
@@ -112,7 +153,7 @@ where
112153
(**self).execute(query)
113154
}
114155

115-
fn fetch<'q, 'e, E>(&'e mut self, query: E) -> <Self::Database as HasCursor<'e, 'q>>::Cursor
156+
fn fetch<'e, 'q, E>(&'e mut self, query: E) -> <Self::Database as HasCursor<'e, 'q>>::Cursor
116157
where
117158
E: Execute<'q, Self::Database>,
118159
{
@@ -151,16 +192,9 @@ where
151192
{
152193
fn drop(&mut self) {
153194
if self.depth > 0 {
154-
if let Some(mut inner) = self.inner.take() {
195+
if let Some(inner) = self.inner.take() {
155196
spawn(async move {
156-
let res = inner.execute("ROLLBACK").await;
157-
158-
// If the rollback failed we need to close the inner connection
159-
if res.is_err() {
160-
// This will explicitly forget the connection so it will not
161-
// return to the pool
162-
let _ = inner.close().await;
163-
}
197+
let _ = inner.close().await;
164198
});
165199
}
166200
}

tests/postgres.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,99 @@ async fn it_can_work_with_transactions() -> anyhow::Result<()> {
153153
Ok(())
154154
}
155155

156+
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
157+
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
158+
async fn it_can_work_with_nested_transactions() -> anyhow::Result<()> {
159+
let mut conn = new::<Postgres>().await?;
160+
161+
conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_2523 (id INTEGER PRIMARY KEY)")
162+
.await?;
163+
164+
conn.execute("TRUNCATE _sqlx_users_2523").await?;
165+
166+
// begin
167+
let mut tx = conn.begin().await?;
168+
169+
// insert a user
170+
sqlx::query("INSERT INTO _sqlx_users_2523 (id) VALUES ($1)")
171+
.bind(50_i32)
172+
.execute(&mut tx)
173+
.await?;
174+
175+
// begin once more
176+
let mut tx = tx.begin().await?;
177+
178+
// insert another user
179+
sqlx::query("INSERT INTO _sqlx_users_2523 (id) VALUES ($1)")
180+
.bind(10_i32)
181+
.execute(&mut tx)
182+
.await?;
183+
184+
// never mind, rollback
185+
let mut tx = tx.rollback().await?;
186+
187+
// did we really?
188+
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_2523")
189+
.fetch_one(&mut tx)
190+
.await?;
191+
192+
assert_eq!(count, 1);
193+
194+
// actually, commit
195+
let mut conn = tx.commit().await?;
196+
197+
// did we really?
198+
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_2523")
199+
.fetch_one(&mut conn)
200+
.await?;
201+
202+
assert_eq!(count, 1);
203+
204+
Ok(())
205+
}
206+
207+
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
208+
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
209+
async fn it_can_rollback_nested_transactions() -> anyhow::Result<()> {
210+
let mut conn = new::<Postgres>().await?;
211+
212+
conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_512412 (id INTEGER PRIMARY KEY)")
213+
.await?;
214+
215+
conn.execute("TRUNCATE _sqlx_users_512412").await?;
216+
217+
// begin
218+
let mut tx = conn.begin().await?;
219+
220+
// insert a user
221+
sqlx::query("INSERT INTO _sqlx_users_512412 (id) VALUES ($1)")
222+
.bind(50_i32)
223+
.execute(&mut tx)
224+
.await?;
225+
226+
// begin once more
227+
let mut tx = tx.begin().await?;
228+
229+
// insert another user
230+
sqlx::query("INSERT INTO _sqlx_users_512412 (id) VALUES ($1)")
231+
.bind(10_i32)
232+
.execute(&mut tx)
233+
.await?;
234+
235+
// stop the phone, drop the entire transaction
236+
tx.close().await?;
237+
238+
// did we really?
239+
let mut conn = new::<Postgres>().await?;
240+
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_512412")
241+
.fetch_one(&mut conn)
242+
.await?;
243+
244+
assert_eq!(count, 0);
245+
246+
Ok(())
247+
}
248+
156249
// run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test`
157250
#[ignore]
158251
#[cfg_attr(feature = "runtime-async-std", async_std::test)]

0 commit comments

Comments
 (0)