multithreadingrustbackground-processspawnratatui

Run external command in bg (other thread), capture output when available for rendering in TUI


I'm writing a little TUI in Rust using ratatui which needs to communicate with external programms, capture the output and render it.

I know the basics of running processes in other threads and send the results via mpsc::channel() or share it via Arc<Mutex<...>> structs. But in case of the TUI, I don't get it to work the way I need it.

In general, the app consists of a list in one area and the info for the selected list item in the other area. I made a little example using curl and DOI's. The DOI's represent the list. In the info area the Bibtex formatting of the selected DOI should be rendered using curl -LH "Accept: application/x-bibtex" <doi>.

If I don't spawn the command which fetches the Bibtex info in a different thread, list movement is blocked as long as curl fetches the results, which is the normal behaviour for single-threaded apps running longer lasting tasks.

Now, I spawn another thread to run the fetch_bibtex() function in the background and capture the output if its there. Theoretically, it works, but if I move the list to fast, the rendered info is not the one of the selected item. And after a while curl quits with "too many requests" error.

What I want, but can't get to work: In case I move the list very fast, I want to only load the info of the item I stop on. This means, for fast movement running processes which didn't finish yet need to be killed. But don't know how to do that. Even I'm not sure on which event to call the fetch_bibtex() function. I tried different things, nothing worked.

Nice-to-have would be to render a "... Loading" or similar as long as the info isn't there.

Here is my minimal example which uses Arc<Mutex<String>> for the info content. I did also try to send the info if fetched and receive it with mpsc channels, but the outcome was similar:

use std::{
    process::{Command, Stdio},
    sync::{Arc, Mutex},
    thread,
};

use color_eyre::Result;
use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers};
use ratatui::{
    layout::{Constraint, Layout},
    style::{Modifier, Style},
    widgets::{Block, List, ListState, Paragraph},
    DefaultTerminal, Frame,
};

#[derive(Debug)]
pub struct App {
    running: bool,
    info_text: Arc<Mutex<String>>,
    list: Vec<String>,
    state: ListState,
}

impl App {
    pub fn new() -> Self {
        Self {
            running: false,
            info_text: Arc::new(Mutex::new(String::new())),
            list: vec![
                "http://dx.doi.org/10.1163/9789004524774".into(),
                "http://dx.doi.org/10.1016/j.algal.2015.04.001".into(),
                "https://doi.org/10.1093/acprof:oso/9780199595006.003.0021".into(),
                "https://doi.org/10.1007/978-94-007-4587-2_7".into(),
                "https://doi.org/10.1093/acprof:oso/9780199595006.003.0022".into(),
            ],
            state: ListState::default().with_selected(Some(0)),
        }
    }

    pub fn run(mut self, mut terminal: DefaultTerminal) -> Result<()> {
        self.running = true;
        while self.running {
            terminal.draw(|frame| self.draw(frame))?;
            self.handle_crossterm_events()?;
        }
        Ok(())
    }

    fn draw(&mut self, frame: &mut Frame) {
        self.fetch_bibtex();
        let [left, right] =
            Layout::vertical([Constraint::Fill(1), Constraint::Fill(1)]).areas(frame.area());

        let list = List::new(self.list.clone())
            .block(Block::bordered().title_top("List"))
            .highlight_style(Style::new().add_modifier(Modifier::REVERSED));

        let info = Paragraph::new(self.info_text.lock().unwrap().clone().to_string())
            .block(Block::bordered().title_top("Bibtex-Style"));

        frame.render_stateful_widget(list, left, &mut self.state);
        frame.render_widget(info, right);
    }

    fn handle_crossterm_events(&mut self) -> Result<()> {
        match event::read()? {
            Event::Key(key) if key.kind == KeyEventKind::Press => self.on_key_event(key),
            Event::Mouse(_) => {}
            Event::Resize(_, _) => {}
            _ => {}
        }
        Ok(())
    }

    fn fetch_bibtex(&mut self) {
        let sel_doi = self.list[self.state.selected().unwrap_or(0)].clone();
        let info_str = Arc::clone(&self.info_text);

        thread::spawn(move || {
            let output = Command::new("curl")
                .arg("-LH")
                .arg("Accept: application/x-bibtex")
                .arg(&sel_doi)
                .stdout(Stdio::piped())
                .output()
                .expect("Not running");

            let output_str = String::from_utf8_lossy(&output.stdout).to_string();
            let mut info = info_str.lock().unwrap();
            *info = output_str;
        });
    }

    fn on_key_event(&mut self, key: KeyEvent) {
        match (key.modifiers, key.code) {
            (_, KeyCode::Esc | KeyCode::Char('q'))
            | (KeyModifiers::CONTROL, KeyCode::Char('c') | KeyCode::Char('C')) => self.quit(),
            (_, KeyCode::Down | KeyCode::Char('j')) => {
                if self.state.selected().unwrap() <= 3 {
                    self.state.scroll_down_by(1);
                }
            }
            (_, KeyCode::Up | KeyCode::Char('k')) => {
                self.state.scroll_up_by(1);
            }
            _ => {}
        }
    }

    fn quit(&mut self) {
        self.running = false;
    }
}

fn main() -> color_eyre::Result<()> {
    color_eyre::install()?;
    let terminal = ratatui::init();
    let result = App::new().run(terminal);
    ratatui::restore();
    result
}

For sure, that is only an example. In the real app the list is much longer. Also curl and Bibtex code are only dummy examples since they should be available on most UNIX systems. The programm I use in my real app is not that wide spread, therefore, I choosed curl as example because of the time delay it has fetching Bibtex keys.

Unfortunately, using a native Rust lib or a C-lib with a well-programmed API which I can connect to through Rust is not possible, since no such lib exists for the programm I'm trying to run as background process.

I appreciate every help what I need to change. I only started learning Rust a few months ago and have no professionell programming background yet. So mistakes like this are opportunities to learn!

Edit: Close to goal

I've reworked everything a little bit and have almost the result I'm looking for. By inserting a delay using event::poll it only fetches the current items info if I stay on the item for about 500ms. So if I move the list fast, I don't run too many bg processes.

Only thing still not perfect: If I move the list constantly but not too fast, so the bg-processes are triggered every time, it still fetches the infos from the last item and draws it for a short time before it draws the current items info. Thus, I need to kill all "old" processes which are still running in the bg on every keypress which selects a new item. So far, I tried many things (bools, AtomicBools etc) but couldn't figure out how to do that since the bg thread is blocked as long as curl runs...

Happy for last hints!

use std::{
    process::{Command, Stdio},
    sync::{
        atomic::{AtomicBool, Ordering},
        mpsc::{self, channel, Receiver, Sender},
        Arc, Condvar, Mutex,
    },
    thread,
    time::Duration,
};

use color_eyre::Result;
use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers};
use ratatui::{
    layout::{Constraint, Layout},
    style::{Modifier, Style},
    widgets::{Block, List, ListState, Paragraph},
    DefaultTerminal, Frame,
};

#[derive(Debug)]
pub struct Mailbox {
    finished: Arc<AtomicBool>,
    data: Arc<Mutex<Option<String>>>,
    cond: Arc<Condvar>,
    output: Arc<Mutex<String>>,
}

impl Mailbox {
    fn new() -> Self {
        Self {
            finished: Arc::new(AtomicBool::new(false)),
            data: Arc::new(Mutex::new(None)),
            cond: Arc::new(Condvar::new()),
            output: Arc::new(Mutex::new(String::new())),
        }
    }
}

pub fn run_bg_cmd(
    data: Arc<Mutex<Option<String>>>,
    cond: Arc<Condvar>,
    output_field: Arc<Mutex<String>>,
    finished: Arc<AtomicBool>,
) {
    loop {
        let mut guard = data.lock().unwrap();
        while guard.is_none() {
            guard = cond.wait(guard).unwrap();
        }
        let request = guard.take().unwrap();

        drop(guard);

        let output = Command::new("curl")
            .arg("-LH")
            .arg("Accept: application/x-bibtex")
            .arg(&request)
            .stdout(Stdio::piped())
            .stderr(Stdio::null())
            .output()
            .expect("Not running");

        if output.status.success() {
            finished.store(true, Ordering::Relaxed);
            let output_str = String::from_utf8_lossy(&output.stdout).to_string();

            let mut output_mut = output_field.lock().unwrap();
            *output_mut = output_str;
        }
    }
}

#[derive(Debug)]
pub struct App {
    mb: Mailbox,
    running: bool,
    fetch_info: bool,
    info_text: String,
    list: Vec<String>,
    state: ListState,
}

impl App {
    pub fn new(mb: Mailbox) -> Self {
        Self {
            mb,
            running: false,
            fetch_info: false,
            info_text: String::new(),
            list: vec![
                "http://dx.doi.org/10.1163/9789004524774".into(),
                "http://dx.doi.org/10.1016/j.algal.2015.04.001".into(),
                "https://doi.org/10.1093/acprof:oso/9780199595006.003.0021".into(),
                "https://doi.org/10.1007/978-94-007-4587-2_7".into(),
                "https://doi.org/10.1093/acprof:oso/9780199595006.003.0022".into(),
            ],
            state: ListState::default().with_selected(Some(0)),
        }
    }

    pub fn run(mut self, mut terminal: DefaultTerminal) -> Result<()> {
        self.running = true;
        while self.running {
            terminal.draw(|frame| self.draw(frame))?;
            self.handle_crossterm_events()?;
        }
        Ok(())
    }

    fn draw(&mut self, frame: &mut Frame) {
        let [left, right] =
            Layout::vertical([Constraint::Fill(1), Constraint::Fill(1)]).areas(frame.area());

        let list = List::new(self.list.clone())
            .block(Block::bordered().title_top("List"))
            .highlight_style(Style::new().add_modifier(Modifier::REVERSED));

        let info = Paragraph::new(self.info_text.as_str())
            .block(Block::bordered().title_top("Bibtex-Style"));

        frame.render_stateful_widget(list, left, &mut self.state);
        frame.render_widget(info, right);
    }

    fn handle_crossterm_events(&mut self) -> Result<()> {
        if event::poll(Duration::from_millis(500))? {
            match event::read()? {
                Event::Key(key) if key.kind == KeyEventKind::Press => self.on_key_event(key),
                Event::Mouse(_) => {}
                Event::Resize(_, _) => {}
                _ => {}
            }
        } else {
            if self.fetch_info {
                self.update_info();
            }
            if self.mb.finished.load(Ordering::Relaxed) == true {
                self.info_text = self.mb.output.lock().unwrap().to_string();
                self.mb.finished.store(false, Ordering::Relaxed);
            }
        }
        Ok(())
    }

    fn update_info(&mut self) {
        let sel_doi = self.list[self.state.selected().unwrap_or(0)].clone();
        let mut guard = self.mb.data.lock().unwrap();
        *guard = Some(sel_doi);
        self.mb.cond.notify_one();
        drop(guard);
        self.fetch_info = false;
    }

    fn on_key_event(&mut self, key: KeyEvent) {
        match (key.modifiers, key.code) {
            (_, KeyCode::Esc | KeyCode::Char('q'))
            | (KeyModifiers::CONTROL, KeyCode::Char('c') | KeyCode::Char('C')) => self.quit(),
            (_, KeyCode::Down | KeyCode::Char('j')) => {
                if self.state.selected().unwrap() <= 3 {
                    self.info_text = "... Loading".to_string();
                    self.state.scroll_down_by(1);
                    self.fetch_info = true;
                }
            }
            (_, KeyCode::Up | KeyCode::Char('k')) => {
                self.info_text = "... Loading".to_string();
                self.state.scroll_up_by(1);
                self.fetch_info = true;
            }
            _ => {}
        }
    }

    fn quit(&mut self) {
        self.running = false;
    }
}

fn main() -> color_eyre::Result<()> {
    color_eyre::install()?;

    let mb = Mailbox::new();

    let curl_data = Arc::clone(&mb.data);
    let curl_cond = Arc::clone(&mb.cond);
    let curl_output = Arc::clone(&mb.output);
    let curl_bool = Arc::clone(&mb.finished);

    thread::spawn(move || {
        run_bg_cmd(curl_data, curl_cond, curl_output, curl_bool);
    });

    let terminal = ratatui::init();
    let result = App::new(mb).run(terminal);
    ratatui::restore();
    result
}

I know this must be an easy exercise for experienced Rust programmers, but I just don't get it right now...

Still thankful for every help on this!

Newer Edit: even closer

I now run the curl command as spawned process and capture its output. It works fine, expect the case when moving the list steadily but not too fast.

My main function handling the bg-commands in another thread now looks like this:

pub fn run_bg_cmd(
    fetch_item: Arc<Mutex<Option<String>>>,
    cond: Arc<Condvar>,
    output_val: Arc<Mutex<String>>,
    finished: Arc<AtomicBool>,
    kill_bool: Arc<AtomicBool>,
) {
    loop {
        let mut request = fetch_item.lock().unwrap();
        while request.is_none() {
            request = cond.wait(request).unwrap();
        }
        let cur_request = request.take().unwrap();

        drop(request);

        let mut bg_cmd = Command::new("curl")
            .arg("-LH")
            .arg("Accept: application/x-bibtex")
            .arg(&cur_request)
            .stdout(Stdio::piped())
            .stderr(Stdio::null())
            .spawn()
            .expect("Not running");

        // let pid = bg_cmd.id();
        loop {
            if kill_bool.load(Ordering::Relaxed) {
                bg_cmd.kill().unwrap();
                break;
            } else if bg_cmd.wait().unwrap().success() {
                let output = bg_cmd.wait_with_output().unwrap();

                finished.store(true, Ordering::Relaxed);
                let output = String::from_utf8_lossy(&output.stdout).to_string();

                let mut output_str = output_val.lock().unwrap();
                *output_str = output;
                break;
            } else {
                continue;
            }
        }
    }
}

Just need to figure out how to kill a still running process and dismiss its output if a new one is spawned. Tried some approaches with a loop and some AtomicBool's (as above), but that seems to have no effect... Maybe using the PID in some way?


Solution

  • Don't spawn a thread for each request, only spawn a single thread at program start, then use a mailbox to send requests to that thread. That way you won't be starting multiple curl requests in parallel and you will only keep the last result.

    There's probably a crate implementing a mailbox somewhere, but it's pretty easy to do yourself with a Mutex and a Condvar from std::sync. Something like:

    struct Mailbox {
        data: Mutex<Option<String>>;
        cond: Condvar;
    }
    
    fn curl_thread (mailbox: Arc<Mailbox>, reply: Sender<String>) {
        loop {
            let mut guard = mailbox.data.lock().unwrap();
            while guard.is_none() { guard = mailbox.cond.wait (guard).unwrap(); }
            let request = guard.take().unwrap(); 
            drop (guard); // Release the lock so that the main thread may proceed while we're running
    
            todo!("Call curl (or whatever) and send the result using the `reply` channel.");
        }
    }
    
    // Send a new request to the `curl_thread`. Note that:
    // - This will not interrupt the currently running request if there is one
    //   (left as an exercise to the reader)
    // - This will only keep at most one pending request, so that if multiple
    //   requests are sent while the first one is running, only the last one
    //   will be kept.
    let guard = mailbox.data.lock().unwrap();
    *guard = Some (request);
    mailbox.cond.notify_one();
    drop (guard);