rustrust-futures

Get result from both of `async` functions


Given sample weather API & it's corresponding cache wrapper (StreamCache), which needs to always return the most recent value the API has delivered.

My approach: (since subscribe needs to inform fetch to make it functional & since both of these futures will be polled simultaneously, in theory it should work)

let temperature_data = join!(api.subscribe(), api.fetch());

How do I retrieve result from both of these async functions ? I tried using join!, but somehow, it waits forever.

use async_trait::async_trait;
use futures::join;
use futures::stream::BoxStream;
use std::{collections::HashMap, result::Result, sync::{Arc, Mutex}};
type City = String;
type Temperature = u64;

#[async_trait]
pub trait Api: Send + Sync + 'static {
    async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
    async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
}

pub struct StreamCache {
    results: Arc<Mutex<HashMap<String, u64>>>,
}

impl StreamCache {
    pub async fn new(api: impl Api) -> Self {
        let instance = Self {
            results: Arc::new(Mutex::new(HashMap::new())),
        };

        let temperature_data = join!(api.subscribe(), api.fetch());

        // instance.update_in_background(api);
        instance
    }

    pub fn get(&self, key: &str) -> Option<u64> {
        let results = self.results.lock().expect("poisoned");
        results.get(key).copied()
    }

    pub fn update_in_background(&self, api: impl Api) {
        // todo: perform action
    }
}

#[cfg(test)]
mod tests {
    use tokio::sync::Notify;

    use futures::{future, stream::select, FutureExt, StreamExt};
    use maplit::hashmap;

    use super::*;

    #[derive(Default)]
    struct TestApi {
        signal: Arc<Notify>,
    }

    #[async_trait]
    impl Api for TestApi {
        async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
            // fetch is slow an may get delayed until after we receive the first updates
            self.signal.notified().await;
            Ok(hashmap! {
                "Berlin".to_string() => 29,
                "Paris".to_string() => 31,
            })
        }

        async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
            let results = vec![
                Ok(("London".to_string(), 27)),
                Ok(("Paris".to_string(), 32)),
            ];
            select(
                futures::stream::iter(results),
                async {
                    self.signal.notify_one();
                    future::pending().await
                }.into_stream(),
            ).boxed()
        }
    }

    #[tokio::test]
    async fn works() {
        let cache = StreamCache::new(TestApi::default()).await;

        // Allow cache to update
        // time::sleep(Duration::from_millis(100)).await;

        // assert_eq!(cache.get("Berlin"), Some(29));
        // assert_eq!(cache.get("London"), Some(27));
        // assert_eq!(cache.get("Paris"), Some(32));
    }
}

Solution

  • Does it make sense to change your subscribe() implementation to

    async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
        let results = vec![
            Ok(("London".to_string(), 27)),
            Ok(("Paris".to_string(), 32)),
        ];
        self.signal.notify_one();
        futures::stream::iter(results).boxed()
    }
    

    Edit

    To change StreamCache instead of Api

    use async_trait::async_trait;
    use futures::stream::BoxStream;
    use futures::StreamExt;
    use std::{
        collections::HashMap,
        result::Result,
        sync::{Arc, Mutex},
    };
    type City = String;
    type Temperature = u64;
    
    #[async_trait]
    pub trait Api: Send + Sync + 'static {
        async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
        async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
    }
    
    #[derive(Debug, Clone)]
    pub struct StreamCache {
        results: Arc<Mutex<HashMap<String, u64>>>,
    }
    
    
    impl StreamCache {
        pub async fn new<T: Api>(api: T) -> Self {
            let instance = Self {
                results: Arc::new(Mutex::new(HashMap::new())),
            };
    
            let api =  Arc::new(api);
            
            {
                let instance = instance.clone();
                let api = api.clone();
                // Spawn a new tokio thread to handle subscribed data
                tokio::spawn(async move {
                    let mut sub = api.subscribe().await;
                    while let Some(Ok((k, v))) = sub.next().await {
                        let mut map = instance.results.lock().unwrap();
                        map.insert(k, v);
                        println!("{:?}", map);
                    }
                });
            }
    
            // fetch a map
            // *we take the subscribe data as the latest*
            {
                let fetch_map = api.fetch().await.unwrap();
                let mut map = instance.results.lock().unwrap();
                for (k, v) in fetch_map {
                    if map.contains_key(&k) {
                        continue;
                    }
                    map.insert(k, v);
                }
                println!("{:?}", map)
            }
    
            instance
        }
    
        pub fn get(&self, key: &str) -> Option<u64> {
            let results = self.results.lock().expect("poisoned");
            results.get(key).copied()
        }
    
        pub fn update_in_background(self, api: impl Api) {
            // todo: perform action
        }
    }
    
    #[cfg(test)]
    mod tests {
        use tokio::sync::Notify;
    
        use futures::{future, stream::select, FutureExt, StreamExt};
        use maplit::hashmap;
    
        use super::*;
    
        #[derive(Default)]
        struct TestApi {
            signal: Arc<Notify>,
        }
    
        #[async_trait]
        impl Api for TestApi {
            async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
                // fetch is slow an may get delayed until after we receive the first updates
                self.signal.notified().await;
                Ok(hashmap! {
                    "Berlin".to_string() => 29,
                    "Paris".to_string() => 31,
                })
            }
    
            async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
                let results = vec![
                    Ok(("London".to_string(), 27)),
                    Ok(("Paris".to_string(), 32)),
                ];
                select(
                    futures::stream::iter(results),
                    async {
                        self.signal.notify_one();
                        future::pending().await
                    }
                    .into_stream(),
                )
                .boxed()
            }
        }
    
        #[tokio::test]
        async fn works() {
            let cache = StreamCache::new(TestApi::default()).await;
    
            // Allow cache to update
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    
            assert_eq!(cache.get("Berlin"), Some(29));
            assert_eq!(cache.get("London"), Some(27));
            assert_eq!(cache.get("Paris"), Some(32));
        }
    }
    fn main() {}
    

    Output

    PS ***> cargo test -- --nocapture
       Compiling *** v0.1.0 (***)
    
    .
    .
    .
    
    running 1 test
    {"London": 27}
    {"Paris": 32, "London": 27}
    {"Paris": 32, "Berlin": 29, "London": 27}
    test tests::works ... ok
    
    test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.10s
    */