I am trying to make a change to the Rust Tonic UDS gRPC client example. It contains the following code block (see link):
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(|_: Uri| async {
let path = "/tmp/tonic/helloworld";
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
}))
.await?;
The change is super simple: I just want to be able to pass the path
in from the outside rather than setting it to a fixed value. The problem is that it either complains that the function inside service_fn
outlives any path
object defined outside of it. Or, if I use move
to pass it in, it complains that service_fn
expects a FnMut
, but now we're giving it a FnOnce
.
Both errors make sense to me: The method can be called from a separate thread, long after we're past the lifetime of any path
object defined outside of it. And, if we move it into the function, that move can only be done once, so the function can't be called multiple times any more as it might need to be for servicing multiple requests.
I found an answer here on SO that seems to do exactly what I need, but I just can't get it to work. I'm sure I'm missing something obvious, but I just can't figure out what it is.
The simplest approach is to clone everything. When everything is an owned value you don't have to deal with lifetimes whatsoever.
// for demonstrating the more general case, I'm using a non-Copy type here
let path = PathBuf::from("your/path/here");
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let path = path.clone();
async move {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
}
}))
.await?;
This may be appropriate if your code isn't performance sensitive or if the data is small. If you want to avoid cloning, you have to do a bit more work:
let path = PathBuf::from("your/path/here");
let path_shared = Arc::new(path);
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let path_shared = Arc::clone(&path_shared);
async move {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(path_shared.as_ref()).await?,
))
}
}))
.await?;
Let's see how we got here. We'll start with a naive implementation, and let the compiler guide us.
This is what we want, written as simply as possible:
let path = PathBuf::from("your/path/here");
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(|_: Uri| {
async {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
}
}))
And this doesn't compile:
error[E0525]: expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
--> examples/src/uds/client.rs:26:44
|
26 | .connect_with_connector(service_fn(|_: Uri| {
| ---------------------- ^^^^^^^^ this closure implements `FnOnce`, not `FnMut`
| |
| the requirement to implement `FnMut` derives from here
...
29 | Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
| ---- closure is `FnOnce` because it moves the variable `path` out of its environment
|
= note: required for `ServiceFn<{closure@examples/src/uds/client.rs:26:44: 26:52}>` to implement `Service<Uri>`
Okay this makes sense. We moved in an external value and consumed it, of course the closure is FnOnce
. Since UnixStream::connect
is generic, we will try passing it a &path
instead.
Note that the move
keyword is not necessary for a closure to capture by value; it only forces capturing by value in cases where it normally wouldn't (ref).
let path = PathBuf::from("your/path/here");
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(|_: Uri| {
async {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
}
}))
.await?;
error[E0373]: closure may outlive the current function, but it borrows `path`, which is owned by the current function
--> examples/src/uds/client.rs:26:44
|
26 | .connect_with_connector(service_fn(|_: Uri| {
| ^^^^^^^^ may outlive borrowed value `path`
...
29 | Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
| ---- `path` is borrowed here
|
note: function requires argument type to outlive `'static`
--> examples/src/uds/client.rs:26:33
|
26 | .connect_with_connector(service_fn(|_: Uri| {
| _________________________________^
27 | | async {
28 | | // Connect to a Uds socket
29 | | Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
30 | | }
31 | | }))
| |__________^
help: to force the closure to take ownership of `path` (and any other referenced variables), use the `move` keyword
|
26 | .connect_with_connector(service_fn(move |_: Uri| {
| ++++
Okay, lifetime issue, makes sense. The closure could run on a thread long after this function has returned. Let's add move
.
let path = PathBuf::from("your/path/here");
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
async {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
}
}))
.await?;
error: lifetime may not live long enough
--> examples/src/uds/client.rs:27:13
|
26 | .connect_with_connector(service_fn(move |_: Uri| {
| -------------
| | |
| | return type of closure `{async block@examples/src/uds/client.rs:27:13: 27:18}` contains a lifetime `'2`
| lifetime `'1` represents this closure's body
27 | / async {
28 | | // Connect to a Uds socket
29 | | Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
30 | | }
| |_____________^ returning this value requires that `'1` must outlive `'2`
|
= note: closure implements `Fn`, so references to captured variables can't escape the closure
This error message may appear a bit cryptic, especially the annotations on '1
and '2
. But the note at the bottom is telling: we just aren't allowed to keep a reference to data within the closure in our async block. Async blocks capture variables using the same rules as closures (ref), so let's make it async move
.
let path = PathBuf::from("your/path/here");
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
async move {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
}
}))
.await?;
error[E0525]: expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
--> examples/src/uds/client.rs:26:44
|
26 | .connect_with_connector(service_fn(move |_: Uri| {
| ---------------------- ^^^^^^^^^^^^^ this closure implements `FnOnce`, not `FnMut`
| |
| the requirement to implement `FnMut` derives from here
...
29 | Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
| ---- closure is `FnOnce` because it moves the variable `path` out of its environment
|
= note: required for `ServiceFn<{closure@examples/src/uds/client.rs:26:44: 26:57}>` to implement `Service<Uri>`
Okay, so our closure is FnOnce
again. Why? Again the compiler tells us: it's because we moved path
into our async block. We can't do that. For the closure to be FnMut
(and in fact, Fn
), path
has to stay in the closure. In other words, we want shared ownership. In Rust, we do that using reference counting smart pointers. Let's do that.
let path = PathBuf::from("your/path/here");
let path_shared = Rc::new(path);
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
// path_shared gets moved into the closure
// then we create another owned pointer to the same data
let path_shared_2 = Rc::clone(&path_shared);
async move {
// Connect to a Uds socket
// path_shared_2 gets moved into the async block
// but path_shared stays put
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(path_shared_2.as_ref()).await?,
))
}
}))
.await?;
error[E0277]: `Rc<PathBuf>` cannot be sent between threads safely
--> examples/src/uds/client.rs:27:33
|
27 | .connect_with_connector(service_fn(move |_: Uri| {
| __________----------------------_^
| | |
| | required by a bound introduced by this call
28 | | // path_shared gets moved into the closure
29 | | // then we create another owned pointer to the same data
30 | | let path_shared_2 = Rc::clone(&path_shared);
... |
38 | | }
39 | | }))
| |__________^ `Rc<PathBuf>` cannot be sent between threads safely
|
= help: within `ServiceFn<{closure@examples/src/uds/client.rs:27:44: 27:57}>`, the trait `Send` is not implemented for `Rc<PathBuf>`, which is required by `ServiceFn<{closure@examples/src/uds/client.rs:27:44: 27:57}>: Send`
note: required because it's used within this closure
--> examples/src/uds/client.rs:27:44
|
27 | .connect_with_connector(service_fn(move |_: Uri| {
| ^^^^^^^^^^^^^
note: required because it appears within the type `ServiceFn<{closure@examples/src/uds/client.rs:27:44: 27:57}>`
--> /home/cyq/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tower-0.4.13/src/util/service_fn.rs:54:12
|
54 | pub struct ServiceFn<T> {
| ^^^^^^^^^
note: required by a bound in `Endpoint::connect_with_connector`
--> /home/cyq/Repos/Public/tonic/tonic/src/transport/channel/endpoint.rs:366:27
|
364 | pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
| ---------------------- required by a bound in this associated function
365 | where
366 | C: Service<Uri> + Send + 'static,
| ^^^^ required by this bound in `Endpoint::connect_with_connector`
So Endpoint::connect_with_connector
wants a Send
future. What does this mean? The documentation of Send
tells us: it's a type that can be transferred across thread boundaries (ref). The specific reason it wants this is that tokio
by default uses a multi-threaded executor. It is possible to configure it to run single-threaded, thereby alleviating the Send
bound, but it's seldom done in practice.
So why isn't our future Send
? Again the error tells us: Rc<PathBuf>
cannot be sent between threads safely. Why isn't Rc
Send
? We can find it in the docs:
Rc uses non-atomic reference counting. This means that overhead is very low, but an Rc cannot be sent between threads, and consequently Rc does not implement Send. As a result, the Rust compiler will check at compile time that you are not sending Rcs between threads. If you need multi-threaded, atomic reference counting, use sync::Arc.
So let's use Arc
instead.
let path = PathBuf::from("your/path/here");
let path_shared = Arc::new(path);
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let path_shared_2 = Arc::clone(&path_shared);
async move {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(path_shared_2.as_ref()).await?,
))
}
}))
.await?;
And finally we have arrived at our final code. Note that I used the name path_shared_2
here for clarity; in practice people usually just shadow path_shared
like so: let path_shared = Arc::clone(&path_shared);
.
The biggest characteristic about Rust is that it doesn't let you compile until everything is perfect, and getting to perfection is hard. Some people find it frustrating, but personally I like it because I can rest assured that there's not going to be an urgent call that wakes me up at 0300. But as you can see here, the Rust compiler can often do a tremendous job in guiding you to the right answer, if you follow its advice step by step.