rustiron

How can I use Server-Sent events in Iron?


I have a small Rust application that receives some requests through a serial port, does some processing and saves the results locally. I wanted to use a browser as a remote monitor so I can see everything that is happening and as I understand SSEs are pretty good for that.

I tried using Iron for that but I can't find a way to keep the connection open. The request handlers all need to return a Response, so I can't keep sending data.

This was my (dumb) attempt:

fn monitor(req: &mut Request) -> IronResult<Response> {
    let mut headers = Headers::new();
    headers.set(ContentType(Mime(TopLevel::Text, SubLevel::EventStream, vec![])));
    headers.set(CacheControl(vec![CacheDirective::NoCache]));

    println!("{:?}", req);

    let mut count = 0;
    loop {
        let mut response = Response::with((iron::status::Ok, format!("data: Count!:{}", count)));
        response.headers = headers.clone();
        return Ok(response); //obviously won't do what I want

        count += 1;
        std::thread::sleep_ms(1000);
    }
}

Solution

  • I think the short answer is: you can't. The current version of Iron is built on a single request-response interaction. This can be seen in your code because the only way to send a response is to return it; terminating the handler thread.

    There's an issue in Iron to utilize the new async support in Hyper, which itself was merged relatively recently. There are even other people trying to use Server-Send Events in Hyper that haven't succeeded yet.


    If you are willing to use the Hyper master branch, something like this seems to work. No guarantees that this is a good solution or that it doesn't eat up all your RAM or CPU. It seems to work in Chrome though.

    extern crate hyper;
    
    use std::time::{Duration, Instant};
    use std::io::prelude::*;
    
    use hyper::{Control, Encoder, Decoder, Next };
    use hyper::server::{Server, HandlerFactory, Handler, Request, Response};
    use hyper::status::StatusCode;
    use hyper::header::ContentType;
    use hyper::net::{HttpStream};
    
    
    fn main() {
        let address = "0.0.0.0:7777".parse().expect("Invalid address");
        let server = Server::http(&address).expect("Invalid server");
    
        let (_listen, server_loop) = server.handle(MyFactory).expect("Failed to handle");
    
        println!("Starting...");
        server_loop.run();
    }
    
    
    struct MyFactory;
    
    impl HandlerFactory<HttpStream> for MyFactory {
        type Output = MyHandler;
    
        fn create(&mut self, ctrl: Control) -> Self::Output {
            MyHandler {
                control: ctrl,
            }
        }
    }
    
    
    struct MyHandler {
        control: Control,
    }
    
    impl Handler<HttpStream> for MyHandler {
        fn on_request(&mut self, _request: Request<HttpStream>) -> Next {
            println!("A request was made");
            Next::write()
        }
    
        fn on_request_readable(&mut self, _request: &mut Decoder<HttpStream>) -> Next {
            println!("Request has data to read");
            Next::write()
        }
    
        fn on_response(&mut self, response: &mut Response) -> Next {
            println!("A response is ready to be sent");
    
            response.set_status(StatusCode::Ok);
            let mime = "text/event-stream".parse().expect("Invalid MIME");
            response.headers_mut().set(ContentType(mime));
    
            every_duration(Duration:: from_secs(1), self.control.clone());
    
            Next::wait()
        }
    
        fn on_response_writable(&mut self, response: &mut Encoder<HttpStream>) -> Next {
            println!("A response can be written");
    
            // Waited long enough, send some data
            let fake_data = r#"event: userconnect
    data: {"username": "bobby", "time": "02:33:48"}"#;
    
            println!("Writing some data");
            response.write_all(fake_data.as_bytes()).expect("Failed to write");
            response.write_all(b"\n\n").expect("Failed to write");
    
            Next::wait()
        }
    }
    
    use std::thread;
    
    fn every_duration(max_elapsed: Duration, control: Control) {
        let mut last_sent: Option<Instant> = None;
        let mut count = 0;
    
        thread::spawn(move || {
            loop {
                // Terminate after a fixed number of messages
                if count >= 5 {
                    println!("Maximum messages sent, ending");
                    control.ready(Next::end()).expect("Failed to trigger end");
                    return;
                }
    
                // Wait a little while between messages
                if let Some(last) = last_sent {
                    let elapsed = last.elapsed();
                    println!("It's been {:?} since the last message", elapsed);
    
                    if elapsed < max_elapsed {
                        let remaining = max_elapsed - elapsed;
                        println!("There's {:?} remaining", remaining);
                        thread::sleep(remaining);
                    }
                }
    
                // Trigger a message
                control.ready(Next::write()).expect("Failed to trigger write");
    
                last_sent = Some(Instant::now());
                count += 1;
            }
        });
    }
    

    And the client-side JS:

    var evtSource = new EventSource("http://127.0.0.1:7777");
    
    evtSource.addEventListener("userconnect", function(e) {
        const obj = JSON.parse(e.data);
        console.log(obj);
    }, false);