I have a data pipeline where I am making API requests that return a Vec<T>
. Each T
has information that I will use to make another API request.
First, I stream requests and send on a channel that has those Vec<T>
, then my goal is to take that Vector, iterate over it, and spawn new tasks to go make those next step requests.
However, I am getting pretty lost in the design of this and currently my code runs but doesn't provide anything meaningful.
I've tried moving the 2nd tokio::spawn
call before the iterator call (in the while let(Some(r) = post_rx.recv().await
but iter() isn't asnyc so I receive compile errors.
Essentially, I am trying to figure out a way I can consume responses via a channel, and spawn further tasks.
Code below (which will run, but get stuck in the second channel):
use futures::stream::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Post {
pub user_id: u16,
pub id: u16,
pub title: String,
pub body: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct User {
pub id: u16,
pub name: String,
pub username: String,
pub email: String,
pub address: Address,
pub phone: String,
pub website: String,
pub company: Company,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Address {
pub street: String,
pub suite: String,
pub city: String,
pub zipcode: String,
pub geo: Geo,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Geo {
pub lat: String,
pub lng: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Company {
pub name: String,
pub catch_phrase: String,
pub bs: String,
}
#[tokio::main]
async fn main() {
let (post_tx, mut post_rx) = mpsc::channel::<Vec<Post>>(5);
let (user_tx, mut user_rx) = mpsc::channel::<User>(5);
let client = reqwest::Client::new();
// create iterator that will stream async responses
tokio::spawn(async move {
let _ = futures::stream::iter((1..).step_by(1)).then(|i| {
let client = client.clone();
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
client.get(url).send()
})
.and_then(|resp| {
resp.json::<Vec<Post>>()
})
.try_for_each_concurrent(2, |r| async {
let post_tx_cloned = post_tx.clone();
let _ = post_tx_cloned.send(r).await;
Ok(())
}).await;
});
// consume responses from our channel to do future things with results...
let new_client = reqwest::Client::new();
while let Some(r) = post_rx.recv().await {
println!("received {} posts", r.len());
// iterate over vec of posts to spawn other concurrent requests
let posts = r.clone();
let _ = posts.iter().map(|p| {
let p = p.clone();
let cloned_client = new_client.clone();
let user_tx_cloned = user_tx.clone();
// should I move this task spawn before the iter().map() call?
tokio::spawn(async move {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = p.user_id);
let resp = cloned_client
.get(url)
.send()
.await
.unwrap()
.json::<User>()
.await;
println!("{:?}", resp);
user_tx_cloned.send(resp.unwrap()).await.unwrap();
});
});
// quit if the response comes back empty
if r.len() == 0 {
break;
}
}
// read from the user channel
while let Some(user) = user_rx.recv().await {
println!("{:?}", user);
}
}
EDIT: I updated main to spawn a for loop (instead of iter().map()
) and it consumes users in the 2nd channel now. I believe the issue was that since I wasn't consuming the .map()
in the prior code above. It seems simpler to just use a for loop in this situation. The code runs now, but never completes.
Edited code:
#[tokio::main]
async fn main() {
let (post_tx, mut post_rx) = mpsc::channel::<Vec<Post>>(5);
let (user_tx, mut user_rx) = mpsc::channel::<User>(5);
let client = reqwest::Client::new();
// create iterator that will stream async responses
tokio::spawn(async move {
let _ = futures::stream::iter((1..).step_by(1)).then(|i| {
let client = client.clone();
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
client.get(url).send()
})
.and_then(|resp| {
resp.json::<Vec<Post>>()
})
.try_for_each_concurrent(2, |r| async {
let post_tx_cloned = post_tx.clone();
let _ = post_tx_cloned.send(r).await;
Ok(())
}).await;
});
// consume responses from our channel to do future things with results...
while let Some(r) = post_rx.recv().await {
println!("received {} posts", r.len());
// iterate over vec of posts to spawn other concurrent requests
let posts = r.clone();
// for i in posts.iter() {
// println!("post: {:?}", i);
// }
let new_client = reqwest::Client::new();
let cloned_client = new_client.clone();
let user_tx_cloned = user_tx.clone();
tokio::spawn(async move {
for p in posts {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = p.user_id);
let resp = cloned_client
.get(url)
.send()
.await
.unwrap()
.json::<User>()
.await;
user_tx_cloned.send(resp.unwrap()).await.unwrap();
}
});
// quit if the response comes back empty
if r.len() == 0 {
break;
}
}
// read from the user channel
while let Some(user) = user_rx.recv().await {
println!("found user {}", user.id);
}
}
Since your program is network IO bound I suggest forgetting about threading and just use concurrency. By "network IO bound" I just mean that your program spends most of its time waiting for HTTP requests to complete. Other than that (unless your json responses are huge) there isn't much computation happening here. For such a use case, multi threading offers little benefit, and perhaps may even be harmful, since the overhead of thread synchronization is expensive (you will have to profile to find out).
This means remove the channels and removing tokio::spawn
completely, and relying entirely on the stream abstraction. This makes your program basically single threaded, all the JSON parsing and printing happens on one thread.
Using only streams, no threads, we can write your program as follows.
In the first stage you query all available users. This produces a stream of Vec<Post>
responses, since filter_map
discards any Nones returned by the function. Note that if this stream isn't productive (i.e. the function never returns Some) then the rest of the stream pipeline will block forever, which is something you would have to handle in real code:
// from the stream-flatten-iters package
use stream_flatten_iters::StreamExt as sfi_StreamExt;
#[tokio::main]
async fn main() {
let client = reqwest::Client::new();
let mut stream = pin!(futures::stream::iter((1..).step_by(1))
.filter_map(|i| {
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
async {
let resp = (client.get(url).send().await).ok()?;
let parsed_resp = (resp.json::<Vec<Post>>().await).ok()?;
Option::Some(parsed_resp)
}
})
Next we implement your exit condition, when the users endpoint first returns an empty array, we terminate the stream:
.take_while(|r| {
future::ready(r.len() != 0)
})
Then we concurrently create another request. Note that this polls each created request (up to the limit passed into flat_map_ordered
) but does not introduce any parallelism. In other words, suppose you get 10 elements from the first request above - then flat_map_unordered
immediately invokes its function 10 times, saves the resulting 10 requests, and every time flat_map_unordered
is polled, it polls each of those requests and produces a value for any request which is completed. All this happens one one thread, there is no parallelism involved:
// first converts Stream<Vec<Post>> to Stream<Post>
.flatten_iters()
// creates a concurrent request for every Post
.flat_map_unordered(10, |p| {
let user_id = p.user_id.clone();
let client = client.clone();
Box::pin(
futures::stream::once(async move {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = user_id);
client
.get(url).send().await
.unwrap().json::<User>().await
}
))
}));
And finally we consume the stream in the body of main
:
// read from the user channel
while let Some(user) = stream.next().await {
println!("{:?}", user);
}
}
Your next step would be to profile this program in a real world scenario and determine if you have any performance bottlenecks. If you find any, you can parallelize that part of the code.