I started coding with rust lately and I'm loving it. I'm coding on a project where I want to "wrap" a C-API. In one case I have to define callbacks in Rust, which C can call. I let bindgen created the callbacks. Since the code need to run somewhat asynchronous, I'm using tokio for that.
I create the the main function as tokio::main. In the main function i create 2 async tasks, one listens for channels, the other triggers the message queue in the C-API. If messages are available I want to send them via a channel on the callback function, so I can receive the message on the task, where I am listening for events. Later I want to send these messages via SSE or a GraphQL subscription to several clients.
I can't change the C-Callbacks as they need to be passable to the C-API, and I have to use callbacks, otherwise I don't get the messages.
My latest approach looks simplified like that:
use lazy_static::lazy_static;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel(128));
static ref BROADCAST_CONNECTIONSTATE: Mutex<(Sender<u32>, Receiver<u32>)> = Mutex::new(channel(128));
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
tokio::spawn(async move { // wait for a channel to have a message
loop {
tokio::select! {
// wating for a channel to receive a message
Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
habdle2.await.unwrap();
}
// the callback function that gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
// C-API is not async, so use synchronous lock
match BROADCAST_CONNECT.try_lock() {
Ok(value) => match value.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.try_lock() {
Ok(value) => match value.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
},
Err(e) => {
eprintln!("{}", e)
}
}
}
error[E0716]: temporary value dropped while borrowed
--> src/main.rs:37:29
|
35 | / tokio::select! {
36 | | Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
37 | | Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ creates a temporary which is freed while still in use
38 | | }
| | -
| | |
| |_____________temporary value is freed at the end of this statement
| borrow later captured here by closure
|
= note: consider using a `let` binding to create a longer lived value
I understand the message and why this happens, but I can't think of a solution how this could work.
I have a working example that uses crossbeam channels, but i'd rather use async channels from tokio, so i don't have so many dependencies and everything is async.
use lazy_static::lazy_static;
use crossbeam::{
channel::{bounded, Receiver, Sender},
select,
};
use bindgen::{notify_connect, notify_connectionstate};
lazy_static! {
static ref BROADCAST_CONNECT: (Sender<bool>, Receiver<bool>) = bounded(128);
static ref BROADCAST_CONNECTIONSTATE: (Sender<u32>, Receiver<u32>) = bounded(128);
}
#[tokio::main]
async fn main() {
unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API
let handle1 = tokio::spawn(async move {
loop {
select! {
recv(&BROADCAST_CONNECT.1) -> msg => println!("is_connected: {:?}", msg.unwrap()),
recv(&BROADCAST_CONNECTIONSTATE.1) -> msg => println!("connectionstate: {:?}", msg.unwrap()),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
// the callback function thats gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match &BROADCAST_CONNECT.0.send(is_connected) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
};
}
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match BROADCAST_CONNECTIONSTATE.0.send(connectionstate) {
Ok(_) => {}
Err(e) => eprintln!("{}", e),
}
}
One alternative, that i have not get to work either, would be to use some sort of local function or to work with closures. But I'm not sure if, and even if, how this would even work. Maybe someone has an idea. It would be nice, if something like this would work, so i dont have to work with lazy_static (i'd rather not have global/static variables in my code)
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};
#[tokio::main]
async fn main() {
let app = app::App::new();
let mut broadcast_connect = channel::<bool>(128);
let mut broadcast_connectionstate = channel::<bool>(128);
let notify_connect = {
unsafe extern "C" fn _notify_connect(is_connected: bool) {
match broadcast_connect.0.blocking_send(is_connected) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
let notify_connectionstate = {
unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
match broadcast_connectionstate.0.blocking_send(connectionstate) {
Ok(_) => {}
Err(e) => {
eprintln!("{}", e)
}
}
}
};
unsafe { notify_connect(Some(notify_connect)) } // pass the callback function to the C-API
unsafe { notify_connectionstate(Some(notify_connectionstate)) } // pass the callback function to the C-API
let handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(msg) = broadcast_connect.1.recv() => println!("{}", msg),
Some(msg) = broadcast_connectionstate.1.recv() => println!("{}", msg),
}
}
});
let handle2 = tokio::spawn(async move {
loop {
unsafe {
message_queue_in_c(
some_handle,
true,
Duration::milliseconds(100).num_microseconds().unwrap(),
)
}
}
});
handle.await.unwrap();
handle2.await.unwrap();
}
can't capture dynamic environment in a fn item
--> src/main.rs:47:19
|
47 | match broadcast_connectionstate.0.blocking_send(connectionstate) {
| ^^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: use the `|| { ... }` closure form instead
It would be nice, if someone had a solution to either of my problems. If it's a completely new approach, that would be fine either. If channels or tokio or whatever is not the way to go, that's also fine. Mainly I used tokio, because a crate I'm also using tokio, so I don't have to have many more dependencies.
Already thank you, for reading until here.
If you make the following changes to your first example, it should work:
tokio::sync::Mutex
with std::sync::Mutex
so you don't have to use try_lock
in the callback.std::thread::spawn
rather than in tokio::spawn
. (why?)To not store the receiver in the mutex, you can do this:
static ref BROADCAST_CONNECT: Mutex<Option<Sender<bool>>> = Mutex::new(None);
// in main
let (send, recv) = channel(128);
*BROADCAST_CONNECT.lock().unwrap() = Some(send);
If you want a bounded channel, you can release the lock by first cloning the channel, then calling drop
on the lock, and then using blocking_send
to send. With an unbounded channel, this doesn't matter as sending is instant.
// in C callback
let lock = BROADCAST_CONNECT.lock().unwrap();
let send = lock.as_ref().clone();
drop(lock);
send.blocking_send(...);