I'm currently playing around with iced, a gui framework for rust. The following code is the function for a Subscription.
use iced::futures::{SinkExt, Stream};
use iced::time::Duration;
use iced::widget::{center, text};
use iced::window::close_requests;
use iced::{stream, Element, Subscription};
use iced_futures;
use std::process::exit;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
fn count_subscribe() -> impl Stream<Item = Message> {
println!("subscribing...");
stream::channel(100, |mut output| async move {
output.send(Message::Tick).await;
let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();
let mut counter = Arc::new(Mutex::new(0));
let thread = thread::spawn({
let mut shared_count = counter.clone();
move || loop {
*shared_count.lock().unwrap() += 1;
tx.send(true);
thread::sleep(Duration::from_secs(1));
}
});
//thread.join();
output.send(Message::Tick).await;
loop {
if rx.recv().unwrap() {
{
let c = counter.lock().unwrap();
println!("{}", c);
}
output.send(Message::Tick).await;
}
}
})
}
pub fn main() //-> iced::Result
{
println!("Hello, world!");
let app = iced::application("Testwindow", MyWindow::update, MyWindow::view);
app.centered()
.subscription(MyWindow::subscription)
.exit_on_close_request(false)
.run();
println!("this should not be printed");
}
#[derive(Default)]
struct MyWindow {}
impl MyWindow {
fn subscription(&self) -> Subscription<Message> {
Subscription::batch(vec![
Subscription::run(count_subscribe),
Subscription::map(close_requests(), (|_| Message::CloseRequested)),
])
}
fn update(&mut self, message: Message) {
println!("look, a message!");
match message {
Message::INITIALIZE => {
println!("update gui...");
}
Message::Tick => {
println!("tick!");
}
Message::CloseRequested => {
println!("End!");
exit(0);
}
_ => {}
}
}
fn view(&self) -> Element<Message> {
center(text!("Some text in a window")).into()
}
}
#[derive(Debug, Clone)]
enum Message {
INITIALIZE,
KeyPressed,
Tick,
CloseRequested,
None,
}
At the moment as a proof of concept it is just counting, but later I want to add more complex logic in the thread.
I am using a second stream let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();
to communicate from the thread to the async closure since passing (or cloning) output
does not work.
output.send(Message::Tick).await;
I expect "tick!" written to the console (via the update()
function specified for ice)println!("{}", c);
should print the current value of counter to the consoleIn this state, the println! works but none of the output.send()
do, even the one before creating the thread. If i remove the if rx.recv().unwrap()
, all output.send()
work but obviously I cant get triggered by the thread that way. And I really would rather not let the loop do busy waiting.
I have tried removing the await
but without success. I feel like I am missing a fundamental thing, any help would be appreciated.
Cargo.toml
:
[dependencies]
iced = {version = "0.13.1", features = ["tokio"]}
iced_futures = "0.13.2"
kmdreko's comments were correct. The problem you're seeing is caused by doing a blocking call in an async function, and replacing std's channel with Tokio's does fix it. Here are the minimal changes you need to make to do that, while keeping as much of your other code as possible unchanged:
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,9 +5,9 @@
use iced::{stream, Element, Subscription};
use iced_futures;
use std::process::exit;
-use std::sync::mpsc::{Receiver, Sender};
-use std::sync::{mpsc, Arc, Mutex};
+use std::sync::{Arc, Mutex};
use std::thread;
+use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
fn count_subscribe() -> impl Stream<Item = Message> {
println!("subscribing...");
@@ -15,7 +15,7 @@
stream::channel(100, |mut output| async move {
output.send(Message::Tick).await;
- let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();
+ let (tx, mut rx): (UnboundedSender<bool>, UnboundedReceiver<bool>) = mpsc::unbounded_channel();
let mut counter = Arc::new(Mutex::new(0));
@@ -30,7 +30,7 @@
//thread.join();
output.send(Message::Tick).await;
loop {
- if rx.recv().unwrap() {
+ if rx.recv().await.unwrap() {
{
let c = counter.lock().unwrap();
println!("{}", c);
You may also need to do cargo add tokio -F sync
, or add tokio = { version = "1.43.0", features = ["sync"] }
to Cargo.toml
.
Note that I used the unbounded tokio::sync::mpsc::unbounded_channel
here because std::sync::mpsc::channel
is unbounded. If you want, you could use tokio::sync::mpsc::channel
instead, which would be more like std::sync::mpsc::sync_channel
. If you made that change, you'd then want to use blocking_send
instead of send
too.