rustwebsocketrust-actix

Build a WebSocket client using Actix


I previously posted a question about how to Add awc websocket client to add_stream in actix, which focused on how to add a stream to the actor from the AWC Client. I have solved that issue, but I still need to be able to communicate with the server by sending messages.

So, let's start with some context. Here is my actor:

use actix_web_actors::ws::{Frame, ProtocolError};
use awc::BoxedSocket;
use awc::ws::Codec;
use futures::StreamExt;
use log::info;
use openssl::ssl::SslConnector;

pub struct RosClient {
    pub address: String,
    pub connection: Option<Framed<BoxedSocket, Codec>>,
    pub hb: Instant,
}

impl RosClient {
    pub fn new(address: &str) -> Self {
        Self {
            address: address.to_string(),
            connection: None,
            hb: Instant::now(),
        }
    }
}

impl Actor for RosClient {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        info!("Connecting to ROS client on {}", &self.address);
        let ssl = {
            let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
            let _ = ssl.set_alpn_protos(b"\x08http/1.1");
            ssl.build()
        };
        let connector = awc::Connector::new().ssl(ssl).finish();
        let ws = awc::ClientBuilder::new()
            .connector(connector)
            .finish()
            .ws(&self.address)
            .set_header("Host", "0.0.0.0:9090");

        let _message = serde_json::json!({
            "op": "subscribe",
            "topic": "/client_count"
        });

        ws.connect()
            .into_actor(self)
            .map(|res, _act, ctx| match res {
                Ok((client_response, frame)) => {
                    info!("Response: {:?}", client_response);
                    let (_r, w) = frame.split();
                    let _ = ctx.add_stream(w);
                }
                Err(err) => {
                    info!("Websocket Client Actor failed to connect: {:?}", err);
                    ctx.stop();
                }
            })
            .wait(ctx);
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        Running::Stop
    }
}

impl StreamHandler<Result<Frame, ProtocolError>> for RosClient {
    fn handle(&mut self, item: Result<Frame, ProtocolError>, _ctx: &mut Self::Context) {
        match item.unwrap() {
            Frame::Text(text_bytes) => {
                let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
                info!("Message: {}", text);
            }
            Frame::Binary(_) => {}
            Frame::Continuation(_) => {}
            Frame::Ping(_) => {
                info!("Ping received!");
            }
            Frame::Pong(_) => {
                self.hb = Instant::now();
            }
            Frame::Close(_) => {}
        }
    }
}

How do I keep a reference (or a handle, or a copy, or anything that does the job) to the connection, so that when I implement the message handlers, I can send data to the server via e.g.,

Message::Text(message.to_string()) 

Solution

  • After tweaking some things around, I got it working. Even from your previous question, the problem is how you are approaching the connection creation. A good reference is in the crate actix-web-actors where the pattern is something like:

    pub fn start_with_addr<A, T>(
        actor: A, 
        req: &HttpRequest, 
        stream: T
    ) -> Result<(Addr<A>, HttpResponse), Error> 
    

    In your case, this is what I came up with:

    use actix::io::SinkWrite;
    use actix::prelude::*;
    use actix_codec::Framed;
    use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
    use futures::stream::{SplitSink, SplitStream};
    use futures_util::stream::StreamExt;
    use log::{error, info};
    use openssl::ssl::SslConnector;
    
    type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
    type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
    struct RosClient {
        sink: SinkWrite<ws::Message, WsFramedSink>,
    }
    
    impl RosClient {
        pub fn start(sink: WsFramedSink, stream: WsFramedStream) -> Addr<Self> {
            RosClient::create(|ctx| {
                ctx.add_stream(stream);
                RosClient {
                    sink: SinkWrite::new(sink, ctx),
                }
            })
        }
    }
    impl Actor for RosClient {
        type Context = Context<Self>;
    
        fn started(&mut self, _ctx: &mut Context<Self>) {
            info!("RosClient started");
        }
    }
    
    impl actix::io::WriteHandler<WsProtocolError> for RosClient {}
    
    #[derive(Message, Debug)]
    #[rtype(result = "()")]
    struct Event {
        op: String,
        topic: String,
    }
    impl Handler<Event> for RosClient {
        type Result = ();
    
        fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
            info!("Pushing Message {:?}", msg);
            if let Some(error) = self
                .sink
                .write(ws::Message::Text(format!("{:?}", msg).into()))
            {
                error!("Error RosClient {:?}", error);
            }
        }
    }
    
    impl StreamHandler<Result<ws::Frame, WsProtocolError>> for RosClient {
        fn handle(&mut self, item: Result<ws::Frame, WsProtocolError>, _ctx: &mut Self::Context) {
            use ws::Frame;
            match item.unwrap() {
                Frame::Text(text_bytes) => {
                    let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
                    info!("Receiving Message: {}", text);
                }
                Frame::Binary(_) => {}
                Frame::Continuation(_) => {}
                Frame::Ping(_) => {
                    info!("Ping received!");
                }
                Frame::Pong(_) => {
                    //self.hb = Instant::now();
                }
                Frame::Close(_) => {}
            }
        }
    }
    
    #[actix_rt::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        std::env::set_var("RUST_LOG", "info");
        env_logger::init();
        let _ssl = {
            let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
            let _ = ssl.set_alpn_protos(b"\x08http/1.1");
            ssl.build()
        };
        //let connector = awc::Connector::new().ssl(ssl).finish();
        let (_, framed) = Client::default()
            .ws("http://localhost:8080")
            .connect()
            .await?;
        let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
        let addr = RosClient::start(sink, stream);
    
        let _res = addr
            .send(Event {
                op: format!("subscribe"),
                topic: "/client_count".to_string(),
            })
            .await
            .unwrap();
        let _ = actix_rt::signal::ctrl_c().await?;
        Ok(())
    }
    

    I wrote a simple node.js server (without ssl):

    const { WebSocketServer } = require('ws');
    const wss = new WebSocketServer({ port: 8080 });
    
    wss.on('connection', function connection(ws) {
        ws.on('message', function message(data) {
            console.log('received: %s', data);
        });
    
        ws.send('something');
    });
    

    and it works perfectly:

    [2021-12-01T07:08:03Z INFO  actix-wc-client] RosClient started
    [2021-12-01T07:08:03Z INFO  actix-wc-client] Pushing Message Event { op: "subscribe", topic: "/client_count" }
    [2021-12-01T07:08:03Z INFO  actix-wc-client] Receiving Message: something
    

    You may need to update some of actix-* versions. Here is my Cargo.toml file:

    [package]
    name = "actix-wc-client"
    version = "0.1.0"
    edition = "2018"
    
    [dependencies]
    awc = "3.0.0-beta.9"
    openssl = { version = "0.10" }
    log = { version = "0.4" }
    futures = "0.3"
    actix = "0.11"
    actix-web = "4.0.0-beta.10"
    serde = "1"
    serde_json = "1"
    actix-codec = "0.4"
    actix-rt = "2.5"
    futures-util = "0.3"
    actix-http = "3.0.0-beta.11"
    env_logger = "0.7"