I am setting up my first api using the Rust warp crate, and I can't get anywhere close to the performance I was hoping for.
To eliminate issues with my implementation, I added a simple route like this:
async fn ping() -> Result<impl Reply, Rejection> {
Ok(html("pong"))
}
pub fn routes(app: Arc<App>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
path!("v1" / "pub" / "ping")
.and(get())
.and_then(ping)
}
When I curl this endpoint I get the result I expect, but when I try to test with some load using JMeter, the maximum throughput I can get is about 2000 requests/sec, and for anything over a few hundred requests/sec JMeter records a high ratio of errors (60%+).
The JMeter errors all say "Connection refused". It looks like warp/hyper is refusing connections when handling many requests rather than returning 503 responses which is one issue, but more importantly, during this load test my CPU is below 5%, and 2000 requests/sec is quite poor.
The web benchmark site claims 500,000 requests/sec so I must be missing something, but this is a very basic setup.
-- EDIT --
To eliminate JMeter from the equation, I wrote this Rust test client, and was able to get up to 7,000 requests/sec, but it's still not startling.
use core::fmt;
use std::error::Error;
use std::sync::Arc;
use std::time::Instant;
use hyper::{Request, StatusCode};
use hyper_util::rt::TokioIo;
use bytes::Bytes;
use http_body_util::Empty;
use tokio::net::TcpStream;
use tokio::task;
type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
#[tokio::main]
async fn main() -> Result<()> {
let url = "http://localhost:8000/v1/pub/ping".parse::<hyper::Uri>()?;
let concurrency = 20;
let repeat_count = 500;
let authority = url.authority().unwrap();
let addr = Arc::new(String::from(authority.as_str()));
let req = Arc::new(Request::builder()
.method("GET")
.uri(url.path())
.header(hyper::header::HOST, authority.as_str())
.header(hyper::header::CONNECTION, "keep-alive")
.body(Empty::<Bytes>::new())?);
// Warm up the connection
fetch(addr.clone(), req.clone()).await?;
let start = Instant::now();
let mut tasks = Vec::new();
for _ in 0..concurrency {
tasks.push(task::spawn(fetch(addr.clone(), req.clone())));
}
for _ in 1..repeat_count {
for _ in 0..concurrency {
let task = tasks.remove(0);
let _ = task.await?;
tasks.push(task::spawn(fetch(addr.clone(), req.clone())));
}
}
for task in tasks {
let _ = task.await?;
}
let elapsed = start.elapsed();
println!("Elapsed: {:.2?} GET {} {} times with {} concurrency", elapsed, url, concurrency * repeat_count, concurrency);
println!("Average {} req/sec", 1000 * concurrency * repeat_count / elapsed.as_millis());
Ok(())
}
async fn fetch(addr: Arc<String>, req: Arc<Request<Empty<Bytes>>>) -> Result<()> {
let stream = TcpStream::connect(&*addr).await?;
let io = TokioIo::new(stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});
let res = sender.send_request((*req).clone()).await?;
match res.status() {
StatusCode::OK => Ok(()),
_ => Err(Box::new(HttpError::new(res.status())))
}
}
#[derive(Debug)]
struct HttpError {
pub status: StatusCode,
}
impl HttpError {
pub fn new(status: StatusCode) -> Self {
Self { status }
}
}
impl fmt::Display for HttpError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Http status code {}", self.status )
}
}
impl Error for HttpError {
fn source(&self) -> Option<&(dyn Error + 'static)> { None }
fn description(&self) -> &str { "" }
fn cause(&self) -> Option<&dyn Error> { self.source() }
}
-- EDIT --
Just for the hell of it, I decided to write a bare bones web server using just std. It is slightly better than warp, serving about 7,500 requests/sec. This is the code:
use crate::App;
use std::{
io::{prelude::*, BufReader},
net::{Ipv4Addr, TcpListener, TcpStream},
sync::{atomic::Ordering, mpsc, Arc},
thread::{self, available_parallelism, Thread},
};
pub fn run(app: Arc<App>, ipv4: Ipv4Addr, port: u16) {
let authority = format!("{}:{}", ipv4, port);
let listener = TcpListener::bind(authority).unwrap();
let concurrency = available_parallelism().unwrap().get();
let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(concurrency);
let mut senders: Vec<mpsc::Sender<TcpStream>> = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let (sender, receiver) = mpsc::channel();
senders.push(sender);
let app = app.clone();
threads.push(thread::spawn(move || process_connections(app, receiver)))
}
let mut thread_index = 0;
for stream in listener.incoming() {
senders[thread_index].send(stream.unwrap()).unwrap();
thread_index = (thread_index + 1) % concurrency;
}
}
fn process_connections(app: Arc<App>, receiver: mpsc::Receiver<TcpStream>) {
loop {
let stream = receiver.recv().unwrap();
handle_connection(app.clone(), stream);
}
}
fn handle_connection(app: Arc<App>, mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let _http_request: Vec<_> = buf_reader
.lines()
.map(|result| result.unwrap())
.take_while(|line| !line.is_empty())
.collect();
let response = "HTTP/1.1 200 OK\r\n\r\n";
stream.write_all(response.as_bytes()).unwrap();
app.clone()
.request_count
.clone()
.fetch_add(1, Ordering::Relaxed);
}
Maybe the problem is on the client side, or maybe this is just some fundamental limit of my MacBook pro. It's a mystery.
Maybe I will try streaming multiple requests over a single connection and see how that works.
A couple of days later, and I now know exactly what's going on. The problem is that hyper does not support keep-alive correctly. I figured this out by writing a client application that uses low level sockets. I could see that the server (I tried warp, and also directly using hyper) closes the connection after the first request regardless of whether I pass the keep-alive header or not.
After discovering this, I wrote the server side using sockets as well, and implemented keep-alive properly, and this solution processes 70,000 requests/sec.
I'm still not sure how the web framework benchmarks claim 500,000 requests/sec unless they are running on very big machines with 10's of physical cores, but I am quite satisfied with 70k req/sec on my MacBook pro.
I guess the answer to my original question of how to make warp fast, is to fix the keep-alive issues in hyper.
For anyone that wants to try this, here is my code:
Server:
use std::{
io::{
prelude::*, BufReader, BufWriter
},
net::{Ipv4Addr, TcpListener, TcpStream},
sync::{atomic::Ordering, mpsc, Arc},
thread::{self, available_parallelism}
};
use crate::App;
pub fn run(app: Arc<App>, ipv4: Ipv4Addr, port: u16) {
let authority = format!("{}:{}", ipv4, port);
let listener = TcpListener::bind(authority).unwrap();
let concurrency = available_parallelism().unwrap().get();
let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(concurrency);
let mut senders: Vec<mpsc::Sender<TcpStream>> = Vec::with_capacity(concurrency);
for index in 0..concurrency {
let (sender, receiver) = mpsc::channel();
senders.push(sender);
let app = app.clone();
threads.push(thread::spawn(move || process_connections(app, receiver, index + 1)))
}
let mut thread_index = 0;
for stream in listener.incoming() {
senders[thread_index].send(stream.unwrap()).unwrap();
thread_index = (thread_index + 1) % concurrency;
}
}
fn process_connections(app: Arc<App>, receiver: mpsc::Receiver<TcpStream>, index: usize) {
loop {
let stream = receiver.recv().unwrap();
handle_connection(app.clone(), stream, index);
}
}
fn handle_connection(app: Arc<App>, stream: TcpStream, index: usize) {
let mut request_lines = BufReader::new(&stream)
.lines()
.map(|result| match result {
Ok(line) => line,
Err(err) => {
println!("{}# {}", index, err);
String::default()
},
});
let mut response_writer = BufWriter::new(&stream);
loop {
let mut keep_alive = false;
let http_request = extract_request(&mut request_lines);
if http_request.len() == 0 {
return;
} else {
for line in http_request {
if line.to_lowercase() == "connection: keep-alive" {
keep_alive = true;
}
}
}
let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".as_bytes();
match response_writer.write_all(response) {
Ok(_) => {
response_writer.flush();
},
Err(err) => {
println!("{}# Err writing response to stream: {}", index, err);
return;
}
};
app.clone().request_count.fetch_add(1, Ordering::Relaxed);
if !keep_alive {
return
}
}
}
fn extract_request(lines: &mut impl Iterator<Item = String>) -> Vec<String> {
let mut result = Vec::new();
loop {
match lines.next() {
Some(line) => {
if line.len() == 0 { return result; }
result.push(line);
},
None => { return result; }
}
}
}
Client:
use std::{
io::{BufRead, BufReader, BufWriter, Write},
net::TcpStream,
thread::{self, available_parallelism},
time::Instant
};
pub async fn run_test() -> super::Result<()> {
let authority = "localhost:8000";
let path = "/v1/admin/nodes";
let repeat_count: usize = 100000;
let concurrency = available_parallelism().unwrap().get();
let request_text = format!("GET {} HTTP/1.1\r\nHost: {}\r\nConnection: keep-alive\r\n\r\n", path, authority);
let start = Instant::now();
let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let request = request_text.clone();
threads.push(thread::spawn(move || make_request(authority, repeat_count, request.as_bytes())));
}
for thread in threads {
thread.join();
}
let elapsed = start.elapsed();
println!(
"Elapsed: {:.2?} GET {}{} {} times with {} concurrency",
elapsed,
authority,
path,
concurrency * repeat_count,
concurrency
);
println!(
"Average {} req/sec",
((concurrency * repeat_count) as f32 / (elapsed.as_micros() as f32 / 1000000.0)).floor()
);
Ok(())
}
fn make_request(authority: &str, count: usize, request: &[u8]) {
let stream = TcpStream::connect(authority).unwrap();
let mut response_lines = BufReader::new(&stream)
.lines()
.map(|result| match result {
Ok(line) => line,
Err(err) => {
println!("{}", err);
String::default()
},
});
let mut request_writer = BufWriter::new(&stream);
for _ in 0..count {
request_writer.write_all(&request).unwrap();
request_writer.flush().unwrap();
let _ = extract_response(&mut response_lines);
}
}
fn extract_response(lines: &mut impl Iterator<Item = String>) -> Vec<String> {
let mut result = Vec::new();
loop {
match lines.next() {
Some(line) => {
if line.len() == 0 { return result; }
result.push(line);
},
None => { return result; }
}
}
}