I am trying to consume logical replication data from postgres.
I follow this.
First I establish pg connection as:
pub async fn logical_replication_connection() -> Result<tokio_postgres::Client, Box<dyn std::error::Error>> {
println!("Consuming replication...");
let conn_str = "host=localhost user=postgres password=postgres dbname=mydb";
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
Ok(client)
}
This works fine. Then I use it as:
fn main() {
let tokio_rt = tokio::runtime::Runtime::new().unwrap();
tokio_rt.block_on(async {
match db::logical_replication_connection().await {
Ok(client) => {
println!("Connected client: {:?}", client);
process_replication(client).await;
}
Err(e) => eprintln!("Error connecting to database: {}", e),
}
});
}
async fn process_replication(client: tokio_postgres::Client) -> () {
println!("Processing replication....");
let query = "START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '1', publication_names 'my_pub')";
match client.simple_query(query).await {
Ok(messages) => {
for message in messages {
match message {
tokio_postgres::SimpleQueryMessage::Row(row) => {
println!("Replication message: {:?}", row);
}
_ => {}
}
}
}
Err(e) => eprintln!("Failed to start replication: {}", e),
}
}
Running this program throws error:
Failed to start replication: db error: ERROR: syntax error at or near "START_REPLICATION"
Seems 'START_REPLICATION' command is not understood by postgres.
What mistake am I making here?
tokio-postgres
does not support the streaming replication protocol. So replication
is not supported in their Config
which is the reason you can't add it to your connection options. See also the open issue #116 for adding support for it.