Given sample weather API & it's corresponding cache wrapper (StreamCache
),
which needs to always return the most recent value the API has delivered.
My approach: (since subscribe
needs to inform fetch
to make it functional & since both of these futures will be polled simultaneously, in theory it should work)
let temperature_data = join!(api.subscribe(), api.fetch());
How do I retrieve result from both of these async
functions ? I tried using join!
, but somehow, it waits forever.
use async_trait::async_trait;
use futures::join;
use futures::stream::BoxStream;
use std::{collections::HashMap, result::Result, sync::{Arc, Mutex}};
type City = String;
type Temperature = u64;
#[async_trait]
pub trait Api: Send + Sync + 'static {
async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
}
pub struct StreamCache {
results: Arc<Mutex<HashMap<String, u64>>>,
}
impl StreamCache {
pub async fn new(api: impl Api) -> Self {
let instance = Self {
results: Arc::new(Mutex::new(HashMap::new())),
};
let temperature_data = join!(api.subscribe(), api.fetch());
// instance.update_in_background(api);
instance
}
pub fn get(&self, key: &str) -> Option<u64> {
let results = self.results.lock().expect("poisoned");
results.get(key).copied()
}
pub fn update_in_background(&self, api: impl Api) {
// todo: perform action
}
}
#[cfg(test)]
mod tests {
use tokio::sync::Notify;
use futures::{future, stream::select, FutureExt, StreamExt};
use maplit::hashmap;
use super::*;
#[derive(Default)]
struct TestApi {
signal: Arc<Notify>,
}
#[async_trait]
impl Api for TestApi {
async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
// fetch is slow an may get delayed until after we receive the first updates
self.signal.notified().await;
Ok(hashmap! {
"Berlin".to_string() => 29,
"Paris".to_string() => 31,
})
}
async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
let results = vec![
Ok(("London".to_string(), 27)),
Ok(("Paris".to_string(), 32)),
];
select(
futures::stream::iter(results),
async {
self.signal.notify_one();
future::pending().await
}.into_stream(),
).boxed()
}
}
#[tokio::test]
async fn works() {
let cache = StreamCache::new(TestApi::default()).await;
// Allow cache to update
// time::sleep(Duration::from_millis(100)).await;
// assert_eq!(cache.get("Berlin"), Some(29));
// assert_eq!(cache.get("London"), Some(27));
// assert_eq!(cache.get("Paris"), Some(32));
}
}
Does it make sense to change your subscribe()
implementation to
async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
let results = vec![
Ok(("London".to_string(), 27)),
Ok(("Paris".to_string(), 32)),
];
self.signal.notify_one();
futures::stream::iter(results).boxed()
}
To change StreamCache
instead of Api
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use std::{
collections::HashMap,
result::Result,
sync::{Arc, Mutex},
};
type City = String;
type Temperature = u64;
#[async_trait]
pub trait Api: Send + Sync + 'static {
async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
}
#[derive(Debug, Clone)]
pub struct StreamCache {
results: Arc<Mutex<HashMap<String, u64>>>,
}
impl StreamCache {
pub async fn new<T: Api>(api: T) -> Self {
let instance = Self {
results: Arc::new(Mutex::new(HashMap::new())),
};
let api = Arc::new(api);
{
let instance = instance.clone();
let api = api.clone();
// Spawn a new tokio thread to handle subscribed data
tokio::spawn(async move {
let mut sub = api.subscribe().await;
while let Some(Ok((k, v))) = sub.next().await {
let mut map = instance.results.lock().unwrap();
map.insert(k, v);
println!("{:?}", map);
}
});
}
// fetch a map
// *we take the subscribe data as the latest*
{
let fetch_map = api.fetch().await.unwrap();
let mut map = instance.results.lock().unwrap();
for (k, v) in fetch_map {
if map.contains_key(&k) {
continue;
}
map.insert(k, v);
}
println!("{:?}", map)
}
instance
}
pub fn get(&self, key: &str) -> Option<u64> {
let results = self.results.lock().expect("poisoned");
results.get(key).copied()
}
pub fn update_in_background(self, api: impl Api) {
// todo: perform action
}
}
#[cfg(test)]
mod tests {
use tokio::sync::Notify;
use futures::{future, stream::select, FutureExt, StreamExt};
use maplit::hashmap;
use super::*;
#[derive(Default)]
struct TestApi {
signal: Arc<Notify>,
}
#[async_trait]
impl Api for TestApi {
async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
// fetch is slow an may get delayed until after we receive the first updates
self.signal.notified().await;
Ok(hashmap! {
"Berlin".to_string() => 29,
"Paris".to_string() => 31,
})
}
async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
let results = vec![
Ok(("London".to_string(), 27)),
Ok(("Paris".to_string(), 32)),
];
select(
futures::stream::iter(results),
async {
self.signal.notify_one();
future::pending().await
}
.into_stream(),
)
.boxed()
}
}
#[tokio::test]
async fn works() {
let cache = StreamCache::new(TestApi::default()).await;
// Allow cache to update
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(cache.get("Berlin"), Some(29));
assert_eq!(cache.get("London"), Some(27));
assert_eq!(cache.get("Paris"), Some(32));
}
}
fn main() {}
PS ***> cargo test -- --nocapture
Compiling *** v0.1.0 (***)
.
.
.
running 1 test
{"London": 27}
{"Paris": 32, "London": 27}
{"Paris": 32, "Berlin": 29, "London": 27}
test tests::works ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.10s
*/