I wanted to modify my code to bulk insert rows in a transaction instead of row by row.
I know this isn't the fastest way to insert data in a PostgreSQL database but that's not the point of the question.
Here is my code
pub async fn bulk_insert_foos(self, foos: Vec<Foo>) -> Result<(), Error> {
let chunks = foos.chunks(10_000);
for chunk in chunks {
&self.insert_foo_chunk(chunk).await?;
}
Ok(())
}
async fn insert_foo_chunk(&self, chunk: &[Foo]) -> Result<(), Error>
{
let mut tx = self.pool.begin().await?;
for foo in chunk {
let _ = &self.insert_foo(&mut tx, foo).await?;
}
tx.commit().await?;
Ok(())
}
async fn insert_foo(&self, tx: &mut Transaction<Postgres>, foo: &Foo) -> Result<(), Error> {
let _result = sqlx::query(
r#"INSERT INTO public.foo (bar) VALUES($1)"#)
.bind(&foo.bar)
.execute(&mut *tx).await
.map_err(|e| {
println!("{e}");
dbg!(&foo);
e
})?;
Ok(())
}
The problem with this code is that the compiler is telling me on Transaction<Postgres>
implicit elided lifetime not allowed here [E0726]
expected lifetime parameter
Help: indicate the anonymous lifetime
I first tried to specify a simple anonymous lifetime like Transaction<'_, Postgres>
but that didn't work. I got a new message
the trait bound `&mut sqlx::Transaction<'_, Postgres>: Executor<'_>` is not satisfied [E0277]
the trait `Executor<'_>` is not implemented for `&mut sqlx::Transaction<'_, Postgres>`
Help: the following other types implement trait `Executor<'c>`:
`&'c mut PgConnection` implements `Executor<'c>`
`&'c mut PgListener` implements `Executor<'c>`
`&Pool<DB>` implements `Executor<'p>`
Note: required by a bound in `Query::<'q, DB, A>::execute`
So I researched a bit and found this page in SQLx's documentation which states:
The Executor impls for Transaction and PoolConnection have been deleted because they cannot exist in the new crate architecture without rewriting the Executor trait entirely. To fix this breakage, simply add a dereference where an impl Executor is expected, as they both dereference to the inner connection type which will still implement it:
&mut transaction -> &mut *transaction &mut connection -> &mut *connection
Checking the source code in transaction.rs it seems indeed the Executor
trait implementation has been commented out for the Transaction
type.
But the execute
function on the Query
type requires an Executor
pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
where
'q: 'e,
A: 'e,
E: Executor<'c, Database = DB>,
{
executor.execute(self).await
}
The Transaction page in the documentation says: A transaction can be used as an Executor when performing queries
but I don't know how to do that with lifetimes specified.
So is this a dead end? How can I make the above code compiling?
There's no reason this function needs to accept a transaction; it can accept any Executor
. So, just bound on that:
async fn insert_foo<'a, E>(&self, tx: E, foo: &Foo) -> Result<(), Error>
where
E: Executor<'a>,
This is going to reveal that more bounds are required. The simplest way to solve this is to just require an executor for a Postgres database:
async fn insert_foo<'a, E>(&self, tx: E, foo: &Foo) -> Result<(), Error>
where
E: Executor<'a, Database = Postgres>,
Now you can use this function with any Postgres executor, including transactions and connections.
It is possible to express the bounds differently and have it work with other databases, but doing so requires knowing what Foo
is, which you haven't told us.
Alternatively, you can fix this by adding one more dereference to the execute
parameter:
.execute(&mut **tx)
To understand why this is necessary, let's go through it step by step.
tx
is &mut Transaction
.*tx
is Transaction
.&mut *tx
is &mut Transaction
.So this is just a reborrow expression, and you might as well pass tx
. However, when we add another dereference:
tx
is &mut Transaction
.*tx
is Transaction
.**tx
is Connection
, by the DerefMut
implementation of Transaction
.&mut **tx
is &mut Connection
, which does implement Executor
.