multithreadingrustasync-awaitrust-tokiotokio-postgres

Rust and tokio::postgresql, use of moved value


I am working on this kind of code:

database.rs

use tokio::task::JoinHandle;
use tokio_postgres::{Client, Connection, Error, NoTls, Socket, tls::NoTlsStream};

use crate::secret;

pub struct DatabaseConnection {
    pub client: Client,
    pub connection: Connection<Socket, NoTlsStream>
    // pub connection: JoinHandle<Connection<Socket, NoTlsStream>>
}

impl DatabaseConnection {

    async fn new() -> Result<DatabaseConnection, Error> {

        let (new_client, new_connection) = DatabaseConnection::create_connection().await.expect("Error, amazing is amazing");

        Ok(Self {
            client: new_client,
            // connection: tokio::spawn( async move { new_connection } )
            connection: new_connection 
        })
    }

    async fn create_connection() -> Result<(Client, Connection<Socket, NoTlsStream>), Error> {
            // Connect to the database.
        let (client, connection) =
            tokio_postgres::connect(
            &format!(
                "postgres://{user}:{pswd}@localhost/{db}",
                    user = secret::USERNAME, 
                    pswd = secret::PASSWORD, 
                    db = secret::DATABASE
                )[..], 
            NoTls)
            .await?;

        Ok((client, connection))
    }
}


pub async fn init_db() -> Result<(), Error> {

    let database_connection = 
        DatabaseConnection::new()
            .await;

    let db_connection = tokio::spawn(async move {
        if let Err(e) = database_connection {
            println!("Connection error: {:?}", e);
        }
    });

    let create = database_connection.unwrap().client
        .query("CREATE TABLE person  (
            id        SERIAL PRIMARY KEY,
            name      VARCHAR NOT NULLL;
        )", &[]).await?;
    
    Ok(())
    
}

main.rs


#[tokio::main]
async fn main() {
    
    match database::init_db().await {
        Ok(()) => println!("Successfully connected to the database"),
        Err(e) => eprintln!("On main: {}", e)
    }     
}

I can't use the database_connection variable to perform my SQL statements because it's moved into the tokio routine.

I already tried to return on my struct a `connection: tokio::spawn( async move { new_connection } ), but the routine it's never spawned, till i call the attribute, and returns an ended database connection.

How can I solve this?

Thanks in advice


Solution

  • Going with 2 concurrent tasks (a concurrent event loop for the connection) allows sharing your connection between various program modules, and have a non-blocking (potentially non-async) query API.

    It is possible to achieve by implementing a reactor pattern, but not as trivial as in some other languages, because Rust is going to make sure you follow the strict multi-thread correctness.

    Let's say that task 1 is your main program, it spawns task 2 - a DatabaseConnection reactor event loop. Since this instance might be accessed by multiple threads potentially at the same time to start or process an SQL query, wrap it with Arc<Mutex<DatabaseConnection>>.

    To execute a query task 1 needs to send an SQL command to task 2, and wait for the result. One way to do this is to use an mpsc channel for sending commands, and oneshot for the result. oneshot is similar to a promise/future in principle: you can await on one end, and push the value and wake from the other end.

    For some code example check out the channels tutorial. The "Spawn manager task" chapter is gonna be a part of your DatabaseConnection task 2 waiting for SQL queries and processing them. The "Receive responses" chapter that shows how to use oneshot to send the result back.

    Also note that async doesn't imply that you have to block on await. If your program is simple enough, there's a possiblity to avoid having the reactor while still not blocking. This can be done with tokio::select! inside a loop. An example of such usage can be found in the select tutorial - chapter "Resuming an async operation". Imagine that action() is your .query() method. Note that they are calling it, but not await-ing. Then the select! is able to return when the query operation results are ready, and if they are not ready - you are free to do any other async work.