I have a case where I have an http server with axum
that receive payload in a very high throughput (could get to 20m a second). I need to take those bytes from the request and do some heavy computations with them. The problem that the memory reach unpredictably high (could reach to 5Gb). This is the current setup on how I'm trying to achieve it:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel::<WriteRequest>(32);
tokio::spawn(async move {
while let Some(payload) = rx.recv().await {
tokio::task::spawn_blocking(move || {
// Run heavy computation here...
heavy_computation(payload)
}).await;
}
});
// build our application with a route
let app = Router::new()
.route("/write", post(move |req: Bytes| async move {
let data: WriteRequest = Message::decode(req);
// send data here
let _ = tx.send(data).await;
"ok"
}));
let addr = ([0, 0, 0, 0], 8080).into();
let server = axum::Server::bind(&addr)
.serve(app.into_make_service());
if let Err(e) = server.await {
error!("server error: {}", e);
}
Ok(())
}
I think it's the back-pressure on the bounded channel that keeps the requests piling up until they can be sent to the other task
for processing, resulting in the high memory. Because even if I tried to replace the heavy_computation
with a simple sleep
for about 200ms
it ended with the same results. If I eliminate the heavy_computation
part the memory stays low.
What is the right way to approach such problem? Or with this kind of high throughput there is nothing can be done here?
It feels like while the heavy_computation is busy, millions of pending requests pile up. A limit of the number of accepted connections/requests handled at a time is needed. To get to 5Gb usage, it takes just 25K pending requests with 200Kb payloads, not even millions.
axum is based on tower, and tower is based on hyper.
It is a known issue that hyper doesn't have a max connections setting, but people there suggest to use a ConcurrencyLimit middleware from tower, and configure it into the Server, or make a custom accept/handle loop.
It might be possible to pass this middleware into tower via axum too, but otherwise if it is an option for you, you could try go to tower directly or even bare hyper, and implement it using the primitives available there for this workload.