I'm writing a tonic middleware to load application state into a gRPC handler, and I cannot get my app to serve with the middleware layer
I've double-checked the types and looked at many examples and I can't tell what I've gotten wrong in my implementation. Not only that, but the help
message in the compiler error seems to indicate that I have implemented it?
Any help would be greatly appreciated!
Full error on compilation:
error[E0277]: the trait bound `RegistryApplicationStateMiddleware<Routes>: Service<http::request::Request<tonic::transport::Body>>` is not satisfied
--> registry/src/main.rs:39:6
|
39 | .serve(addr)
| ^^^^^ the trait `Service<http::request::Request<tonic::transport::Body>>` is not implemented for `RegistryApplicationStateMiddleware<Routes>`
|
= help: the trait `Service<http::Request<tonic::transport::Body>>` is implemented for `RegistryApplicationStateMiddleware<S>`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `registry` (bin "registry") due to 1 previous error
Note the help message indicates the trait is implemented for a generic version of the middleware?? but is somehow not matching Routes...
middleware.rs
use std::{convert::Infallible, task::{Context, Poll}};
use data_access::db::ConnectionPool;
use http::Response;
use tonic::{body::BoxBody, server::NamedService};
use tower_service::Service;
#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationState {
pub pool: ConnectionPool,
}
#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationStateLayer {
pub pool: ConnectionPool,
}
impl<S> tower_layer::Layer<S> for RegistryApplicationStateLayer {
type Service = RegistryApplicationStateMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
RegistryApplicationStateMiddleware {
inner: service,
pool: self.pool.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct RegistryApplicationStateMiddleware<S> {
inner: S,
pub pool: ConnectionPool,
}
impl<S> Service<http::request::Request<tonic::transport::Body>> for RegistryApplicationStateMiddleware<S>
where
S: Service<http::request::Request<tonic::transport::Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: http::request::Request<tonic::transport::Body>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
req.extensions_mut().insert(RegistryApplicationState {
pool: self.pool.clone(),
});
Box::pin(async move {
// Do extra async work here...
let response = inner.call(req).await?;
Ok(response)
})
}
}
main.rs
use middleware::RegistryApplicationStateLayer;
use tonic::transport::Server;
use client::registry_service_server::RegistryServiceServer;
use server::DemoClient;
use data_access::db::ConnectionPool;
pub mod client;
mod middleware;
pub mod server;
mod store_proto {
include!("client.rs");
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("store_descriptor");
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let addr = "127.0.0.1:9001".parse()?;
let demo_registry_service = DemoClient::default();
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(store_proto::FILE_DESCRIPTOR_SET)
.build()
.unwrap();
let pool = ConnectionPool::default();
Server::builder()
.layer(RegistryApplicationStateLayer { pool })
.add_service(RegistryServiceServer::new(demo_registry_service))
.add_service(reflection_service)
.serve(addr)
.await?;
Ok(())
}
Another note, this is what the Router service implementation looks like in the tonic lib, which I tried to emulate in my implementation:
/// A [`Service`] router.
#[derive(Debug, Default, Clone)]
pub struct Routes {
router: axum::Router,
}
impl Routes {
pub(crate) fn new<S>(svc: S) -> Self
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send,
{
let router = axum::Router::new().fallback(unimplemented);
Self { router }.add_service(svc)
}
pub(crate) fn add_service<S>(mut self, svc: S) -> Self
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send,
{
let svc = svc.map_response(|res| res.map(axum::body::boxed));
self.router = self
.router
.route_service(&format!("/{}/*rest", S::NAME), svc);
self
}
pub(crate) fn prepare(self) -> Self {
Self {
// this makes axum perform update some internals of the router that improves perf
// see https://docs.rs/axum/latest/axum/routing/struct.Router.html#a-note-about-performance
router: self.router.with_state(()),
}
}
}
Lastly, here's my Cargo.toml file
[package]
name = "registry"
version = "0.1.0"
build = "build.rs"
edition = "2021"
publish = false
[dependencies]
axum = { version = "0.7.5", features = [ "json" ] }
clap = { version = "4.1.4", features = ["derive"] }
dotenv = "0.15.0"
futures = "0.3"
http = "1.1.0"
hyper = "1.2.0"
prost = "0.11"
tokio = { version = "1.37", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tonic-reflection = "0.6.0"
tower-http = { version = "0.5.0", features = ["trace"] }
tower-layer = "0.3.2"
tower-request-id = "0.3.0"
tower-service = "0.3.2"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
bytes = "1.6.0"
[build-dependencies]
tonic-build = "0.8"
[dev-dependencies]
uuid = { version = "1.2.2", features = ["v4", "fast-rng"] }
futures-util = "0.3.25"
anyhow = "1"
Always double-check your dependencies, kids. The problem was I had hyper 1.0
installed and Tonic doesn't support hyper 1.0 yet. Here's the working (compiling) middleware with hyper = "0.14.28"
use std::task::{Context, Poll};
use axum::extract::FromRef;
use data_access::db::Pool;
use hyper::body::Body;
use tonic::body::BoxBody;
use tower_service::Service;
#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationState {
pub pool: Pool,
}
impl FromRef<RegistryApplicationState> for Pool {
fn from_ref(app_state: &RegistryApplicationState) -> Pool {
app_state.pool.clone()
}
}
#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationStateLayer {
pub pool: Pool,
}
impl<S> tower_layer::Layer<S> for RegistryApplicationStateLayer {
type Service = RegistryApplicationStateMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
RegistryApplicationStateMiddleware {
inner: service,
pool: self.pool.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct RegistryApplicationStateMiddleware<S> {
inner: S,
pub pool: Pool,
}
impl<S> Service<hyper::Request<Body>>
for RegistryApplicationStateMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
req.extensions_mut().insert(RegistryApplicationState {
pool: self.pool.clone(),
});
Box::pin(async move {
// Do extra async work here...
let response = inner.call(req).await?;
Ok(response)
})
}
}