Beginner in both rust and async programming here.
I have a function that downloads and stores a bunch of tweets in the database:
pub async fn process_user_timeline(config: &Settings, pool: &PgPool, user_object: &Value) {
// get timeline
if let Ok((user_timeline, _)) =
get_user_timeline(config, user_object["id"].as_str().unwrap()).await
{
// store tweets
if let Some(tweets) = user_timeline["data"].as_array() {
for tweet in tweets.iter() {
store_tweet(pool, &tweet, &user_timeline, "normal")
.await
.unwrap_or_else(|e| {
println!(
">>>X>>> failed to store tweet {}: {:?}",
tweet["id"].as_str().unwrap(),
e
)
});
}
}
}
}
It's being called in an asynchronous loop by another function:
pub async fn loop_until_hit_rate_limit<'a, T, Fut>(
object_arr: &'a [T],
settings: &'a Settings,
pool: &'a PgPool,
f: impl Fn(&'a Settings, &'a PgPool, &'a T) -> Fut + Copy,
rate_limit: usize,
) where
Fut: Future,
{
let total = object_arr.len();
let capped_total = min(total, rate_limit);
let mut futs = vec![];
for (i, object) in object_arr[..capped_total].iter().enumerate() {
futs.push(async move {
println!(">>> PROCESSING {}/{}", i + 1, total);
f(settings, pool, object).await;
});
}
futures::future::join_all(futs).await;
}
Sometimes two async tasks will try to insert the same tweet at the same time, producing this error:
failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error, code: "23505", message: "duplicate key value violates unique constraint \"tweets_tweet_id_key\"", detail: Some("Key (tweet_id)=(1398307091442409475) already exists."), hint: None, position: None, where: None, schema: Some("public"), table: Some("tweets"), column: None, data_type: None, constraint: Some("tweets_tweet_id_key"), file: Some("nbtinsert.c"), line: Some(656), routine: Some("_bt_check_unique") })
Mind the code already checks for whether a tweet is present before inserting it, so this only happens in the following scenario: READ from task 1 > READ from task 2 > WRITE from task 1 (success) > WRITE from task 2 (error).
To solve this, my best attempt so far has been to place an unwrap_or_else()
clause which lets one of the tasks fail without panicking out of the entire execution. I am aware of at least one drawback - sometimes both tasks will bail out and the tweet never gets written. It happens in <1% of cases, but it happens.
Are there other drawbacks to my approach I'm not aware of?
What's the right way to handle this? I hate losing data, and even worse doing so non-deterministically.
PS I'm using actix web
and sqlx
as my webserver / db libraries.
Generally for anything that may be written by multiple threads/processes, any logic like
if (!exists) {
writeValue()
}
needs to either be protected by some kind of lock, or the code needs to be changed to write atomically with the possibility the write will fail because something else already wrote to it.
For in-memory data in Rust you'd use Mutex
to ensure that you can read and then write the data back before anything else reads it, or Atomic
to modify the data in such a way that if something already wrote it, you can detect that.
In databases, for any query that might conflict with some other query happening around the same time, you'd want to use an ON CONFLICT
clause in your query so that the database itself knows what to do when it tries to write data and it already exists.
For your case since I'm guessing the tweets are immutable, you'd likely want to do ON CONFLICT tweet_id DO NOTHING
(or whatever your ID column is), in which case the INSERT
will skip inserting if there is already a tweet with the ID you are inserting, and it won't throw an error.