I previously posted a question about how to Add awc websocket client to add_stream in actix, which focused on how to add a stream to the actor from the AWC Client. I have solved that issue, but I still need to be able to communicate with the server by sending messages.
So, let's start with some context. Here is my actor:
use actix_web_actors::ws::{Frame, ProtocolError};
use awc::BoxedSocket;
use awc::ws::Codec;
use futures::StreamExt;
use log::info;
use openssl::ssl::SslConnector;
pub struct RosClient {
pub address: String,
pub connection: Option<Framed<BoxedSocket, Codec>>,
pub hb: Instant,
}
impl RosClient {
pub fn new(address: &str) -> Self {
Self {
address: address.to_string(),
connection: None,
hb: Instant::now(),
}
}
}
impl Actor for RosClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Connecting to ROS client on {}", &self.address);
let ssl = {
let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
let _ = ssl.set_alpn_protos(b"\x08http/1.1");
ssl.build()
};
let connector = awc::Connector::new().ssl(ssl).finish();
let ws = awc::ClientBuilder::new()
.connector(connector)
.finish()
.ws(&self.address)
.set_header("Host", "0.0.0.0:9090");
let _message = serde_json::json!({
"op": "subscribe",
"topic": "/client_count"
});
ws.connect()
.into_actor(self)
.map(|res, _act, ctx| match res {
Ok((client_response, frame)) => {
info!("Response: {:?}", client_response);
let (_r, w) = frame.split();
let _ = ctx.add_stream(w);
}
Err(err) => {
info!("Websocket Client Actor failed to connect: {:?}", err);
ctx.stop();
}
})
.wait(ctx);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
Running::Stop
}
}
impl StreamHandler<Result<Frame, ProtocolError>> for RosClient {
fn handle(&mut self, item: Result<Frame, ProtocolError>, _ctx: &mut Self::Context) {
match item.unwrap() {
Frame::Text(text_bytes) => {
let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
info!("Message: {}", text);
}
Frame::Binary(_) => {}
Frame::Continuation(_) => {}
Frame::Ping(_) => {
info!("Ping received!");
}
Frame::Pong(_) => {
self.hb = Instant::now();
}
Frame::Close(_) => {}
}
}
}
How do I keep a reference (or a handle, or a copy, or anything that does the job) to the connection, so that when I implement the message handlers, I can send data to the server via e.g.,
Message::Text(message.to_string())
After tweaking some things around, I got it working. Even from your previous question, the problem is how you are approaching the connection creation.
A good reference is in the crate actix-web-actors
where the pattern is something like:
pub fn start_with_addr<A, T>(
actor: A,
req: &HttpRequest,
stream: T
) -> Result<(Addr<A>, HttpResponse), Error>
In your case, this is what I came up with:
use actix::io::SinkWrite;
use actix::prelude::*;
use actix_codec::Framed;
use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
use futures::stream::{SplitSink, SplitStream};
use futures_util::stream::StreamExt;
use log::{error, info};
use openssl::ssl::SslConnector;
type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
struct RosClient {
sink: SinkWrite<ws::Message, WsFramedSink>,
}
impl RosClient {
pub fn start(sink: WsFramedSink, stream: WsFramedStream) -> Addr<Self> {
RosClient::create(|ctx| {
ctx.add_stream(stream);
RosClient {
sink: SinkWrite::new(sink, ctx),
}
})
}
}
impl Actor for RosClient {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("RosClient started");
}
}
impl actix::io::WriteHandler<WsProtocolError> for RosClient {}
#[derive(Message, Debug)]
#[rtype(result = "()")]
struct Event {
op: String,
topic: String,
}
impl Handler<Event> for RosClient {
type Result = ();
fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
info!("Pushing Message {:?}", msg);
if let Some(error) = self
.sink
.write(ws::Message::Text(format!("{:?}", msg).into()))
{
error!("Error RosClient {:?}", error);
}
}
}
impl StreamHandler<Result<ws::Frame, WsProtocolError>> for RosClient {
fn handle(&mut self, item: Result<ws::Frame, WsProtocolError>, _ctx: &mut Self::Context) {
use ws::Frame;
match item.unwrap() {
Frame::Text(text_bytes) => {
let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
info!("Receiving Message: {}", text);
}
Frame::Binary(_) => {}
Frame::Continuation(_) => {}
Frame::Ping(_) => {
info!("Ping received!");
}
Frame::Pong(_) => {
//self.hb = Instant::now();
}
Frame::Close(_) => {}
}
}
}
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", "info");
env_logger::init();
let _ssl = {
let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
let _ = ssl.set_alpn_protos(b"\x08http/1.1");
ssl.build()
};
//let connector = awc::Connector::new().ssl(ssl).finish();
let (_, framed) = Client::default()
.ws("http://localhost:8080")
.connect()
.await?;
let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
let addr = RosClient::start(sink, stream);
let _res = addr
.send(Event {
op: format!("subscribe"),
topic: "/client_count".to_string(),
})
.await
.unwrap();
let _ = actix_rt::signal::ctrl_c().await?;
Ok(())
}
I wrote a simple node.js server (without ssl):
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function message(data) {
console.log('received: %s', data);
});
ws.send('something');
});
and it works perfectly:
[2021-12-01T07:08:03Z INFO actix-wc-client] RosClient started
[2021-12-01T07:08:03Z INFO actix-wc-client] Pushing Message Event { op: "subscribe", topic: "/client_count" }
[2021-12-01T07:08:03Z INFO actix-wc-client] Receiving Message: something
You may need to update some of actix-* versions. Here is my Cargo.toml
file:
[package]
name = "actix-wc-client"
version = "0.1.0"
edition = "2018"
[dependencies]
awc = "3.0.0-beta.9"
openssl = { version = "0.10" }
log = { version = "0.4" }
futures = "0.3"
actix = "0.11"
actix-web = "4.0.0-beta.10"
serde = "1"
serde_json = "1"
actix-codec = "0.4"
actix-rt = "2.5"
futures-util = "0.3"
actix-http = "3.0.0-beta.11"
env_logger = "0.7"