genericsrusttraitsimplementation

Missing implementation (but help message says I have implemented it)


I'm writing a tonic middleware to load application state into a gRPC handler, and I cannot get my app to serve with the middleware layer

I've double-checked the types and looked at many examples and I can't tell what I've gotten wrong in my implementation. Not only that, but the help message in the compiler error seems to indicate that I have implemented it?

Any help would be greatly appreciated!

Full error on compilation:

error[E0277]: the trait bound `RegistryApplicationStateMiddleware<Routes>: Service<http::request::Request<tonic::transport::Body>>` is not satisfied
  --> registry/src/main.rs:39:6
   |
39 |     .serve(addr)
   |      ^^^^^ the trait `Service<http::request::Request<tonic::transport::Body>>` is not implemented for `RegistryApplicationStateMiddleware<Routes>`
   |
   = help: the trait `Service<http::Request<tonic::transport::Body>>` is implemented for `RegistryApplicationStateMiddleware<S>`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `registry` (bin "registry") due to 1 previous error

Note the help message indicates the trait is implemented for a generic version of the middleware?? but is somehow not matching Routes...

middleware.rs

use std::{convert::Infallible, task::{Context, Poll}};

use data_access::db::ConnectionPool;
use http::Response;
use tonic::{body::BoxBody, server::NamedService};
use tower_service::Service;

#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationState {
    pub pool: ConnectionPool,
}


#[derive(Debug, Clone, Default)]
pub struct RegistryApplicationStateLayer {
    pub pool: ConnectionPool,
}

impl<S> tower_layer::Layer<S> for RegistryApplicationStateLayer {
    type Service = RegistryApplicationStateMiddleware<S>;

    fn layer(&self, service: S) -> Self::Service {
        RegistryApplicationStateMiddleware {
            inner: service,
            pool: self.pool.clone(),
        }
    }
}

#[derive(Debug, Clone)]
pub struct RegistryApplicationStateMiddleware<S> {
    inner: S,
    pub pool: ConnectionPool,
}


impl<S> Service<http::request::Request<tonic::transport::Body>> for RegistryApplicationStateMiddleware<S>
where
S: Service<http::request::Request<tonic::transport::Body>, Response = Response<BoxBody>, Error = Infallible>
    + NamedService 
    + Clone
    + Send + 'static,
    S::Future: Send + 'static,
    S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, mut req: http::request::Request<tonic::transport::Body>) -> Self::Future {
        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
        // for details on why this is necessary
        let clone = self.inner.clone();
        let mut inner = std::mem::replace(&mut self.inner, clone);

        req.extensions_mut().insert(RegistryApplicationState {
            pool: self.pool.clone(),
        });

        Box::pin(async move {
            // Do extra async work here...
            let response = inner.call(req).await?;

            Ok(response)
        })
    }
}

main.rs

use middleware::RegistryApplicationStateLayer;
use tonic::transport::Server;

use client::registry_service_server::RegistryServiceServer;
use server::DemoClient;


use data_access::db::ConnectionPool;

pub mod client;
mod middleware;
pub mod server;

mod store_proto {
  include!("client.rs");

  pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
    tonic::include_file_descriptor_set!("store_descriptor");
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  dotenv::dotenv().ok();
  let addr = "127.0.0.1:9001".parse()?;
  let demo_registry_service = DemoClient::default();

  let reflection_service = tonic_reflection::server::Builder::configure()
    .register_encoded_file_descriptor_set(store_proto::FILE_DESCRIPTOR_SET)
    .build()
    .unwrap();

    let pool = ConnectionPool::default();


  Server::builder()
    .layer(RegistryApplicationStateLayer { pool })
    .add_service(RegistryServiceServer::new(demo_registry_service))
    .add_service(reflection_service)
    .serve(addr)
    .await?;
  Ok(())
}

Another note, this is what the Router service implementation looks like in the tonic lib, which I tried to emulate in my implementation:

/// A [`Service`] router.
#[derive(Debug, Default, Clone)]
pub struct Routes {
    router: axum::Router,
}

impl Routes {
    pub(crate) fn new<S>(svc: S) -> Self
    where
        S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
            + NamedService
            + Clone
            + Send
            + 'static,
        S::Future: Send + 'static,
        S::Error: Into<crate::Error> + Send,
    {
        let router = axum::Router::new().fallback(unimplemented);
        Self { router }.add_service(svc)
    }

    pub(crate) fn add_service<S>(mut self, svc: S) -> Self
    where
        S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
            + NamedService
            + Clone
            + Send
            + 'static,
        S::Future: Send + 'static,
        S::Error: Into<crate::Error> + Send,
    {
        let svc = svc.map_response(|res| res.map(axum::body::boxed));
        self.router = self
            .router
            .route_service(&format!("/{}/*rest", S::NAME), svc);
        self
    }

    pub(crate) fn prepare(self) -> Self {
        Self {
            // this makes axum perform update some internals of the router that improves perf
            // see https://docs.rs/axum/latest/axum/routing/struct.Router.html#a-note-about-performance
            router: self.router.with_state(()),
        }
    }
}

Lastly, here's my Cargo.toml file

[package]
name = "registry"
version = "0.1.0"
build = "build.rs"
edition = "2021"
publish = false

[dependencies]
axum = { version = "0.7.5", features = [ "json" ] }
clap = { version = "4.1.4", features = ["derive"] }
dotenv = "0.15.0"
futures = "0.3"
http = "1.1.0"
hyper = "1.2.0"
prost = "0.11"
tokio = { version = "1.37", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tonic-reflection = "0.6.0"
tower-http = { version = "0.5.0", features = ["trace"] }
tower-layer = "0.3.2"
tower-request-id = "0.3.0"
tower-service = "0.3.2"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
bytes = "1.6.0"

[build-dependencies]
tonic-build = "0.8"

[dev-dependencies]
uuid = { version = "1.2.2", features = ["v4", "fast-rng"] }
futures-util = "0.3.25"
anyhow = "1"



Solution

  • Always double-check your dependencies, kids. The problem was I had hyper 1.0 installed and Tonic doesn't support hyper 1.0 yet. Here's the working (compiling) middleware with hyper = "0.14.28"

    
    use std::task::{Context, Poll};
    
    use axum::extract::FromRef;
    use data_access::db::Pool;
    use hyper::body::Body;
    use tonic::body::BoxBody;
    use tower_service::Service;
    
    #[derive(Debug, Clone, Default)]
    pub struct RegistryApplicationState {
      pub pool: Pool,
    }
    
    impl FromRef<RegistryApplicationState> for Pool {
      fn from_ref(app_state: &RegistryApplicationState) -> Pool {
        app_state.pool.clone()
      }
    }
    
    #[derive(Debug, Clone, Default)]
    pub struct RegistryApplicationStateLayer {
      pub pool: Pool,
    }
    
    impl<S> tower_layer::Layer<S> for RegistryApplicationStateLayer {
      type Service = RegistryApplicationStateMiddleware<S>;
    
      fn layer(&self, service: S) -> Self::Service {
        RegistryApplicationStateMiddleware {
          inner: service,
          pool: self.pool.clone(),
        }
      }
    }
    
    #[derive(Debug, Clone)]
    pub struct RegistryApplicationStateMiddleware<S> {
      inner: S,
      pub pool: Pool,
    }
    
    impl<S> Service<hyper::Request<Body>>
      for RegistryApplicationStateMiddleware<S>
      where
          S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
          S::Future: Send + 'static,
      {
      type Response = S::Response;
      type Error = S::Error;
      type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
    
      fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
      }
    
      fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
        // for details on why this is necessary
        let clone = self.inner.clone();
        let mut inner = std::mem::replace(&mut self.inner, clone);
    
        req.extensions_mut().insert(RegistryApplicationState {
          pool: self.pool.clone(),
        });
    
        Box::pin(async move {
          // Do extra async work here...
          let response = inner.call(req).await?;
    
          Ok(response)
        })
      }
    }