socketsrustudpudpclientrust-actix

UdpFramed with Actix Rust. Can't send messages using SinkWrite


I'm trying to write a Udp Client Actor using Actix. I've followed this example, UDP-Echo, but I can't seem to send a message to the server using the UdpFramed tokio struct. Here's what I have so far, this is the Udp Client Actor implementation

use std::collections::HashMap;
use std::net::{SocketAddr};
use actix_rt::net::UdpSocket;
use actix::{Actor, Addr, AsyncContext, Context, Handler, StreamHandler, Message};
use actix::io::SinkWrite;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{SplitSink};
use futures_util::StreamExt;
use log::info;
use serde_json::Value;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use crate::rosclient::messages::Subscribe;
use std::io::Result;
mod messages;



type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec, UdpSocket>, SinkItem>;

pub struct UdpClientActor {
    pub address: SocketAddr,
    pub sink: SinkWrite<SinkItem, UdpSink>,
}

impl UdpClientActor {
    pub fn start(udp: UdpSocket, address: SocketAddr) -> Addr<UdpClientActor> {

        let framed = UdpFramed::new(udp, BytesCodec::new());

        let (split_sink, split_stream) = framed.split();

        UdpClientActor::create(|ctx| {
            ctx.add_stream(split_stream.filter_map(
                |item: Result<(BytesMut, SocketAddr)>| async {
                    item.map(|(data, sender)| UdpPacket(data, sender)).ok()
                },
            ));

            UdpClientActor {
                address,
                sink: SinkWrite::new(split_sink, ctx),
            }
        })
    }

}

impl Actor for UdpClientActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        let mut hashmap = HashMap::new();
        hashmap.insert(String::from("topic"), Value::String(String::from("/client_count")));
        let subscription = Subscribe {
            id: Default::default(),
            op: "subscribe".to_string(),

            extra: hashmap
        };
        ctx.notify(subscription);
    }
}

#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);

impl StreamHandler<UdpPacket> for
UdpClientActor {
    fn handle(&mut self, item: UdpPacket, _ctx: &mut Self::Context) {
        println!("Received: ({:?}, {:?})", item.0, item.1);
        self.sink.write((item.0.into(), item.1)).unwrap();
    }
}

impl actix::io::WriteHandler<std::io::Error> for UdpClientActor {}

impl Handler<Subscribe> for UdpClientActor {
    type Result = ();

    fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
        let js = serde_json::json!(msg).to_string();
        let _ = self.sink.write((Bytes::from(msg.to_string()), self.address));

        info!("Subscribing to topic {}", js);
    }
}

My main function creates the udp socket and spawns the actor.

fn main() {
    ////////////////////////////////////////////////////////////////////////////
   
    let fut = async {

        ////////////////////////////////////////////////////////////////////////////
        /////////// UDP_ACTOR
        let sock = tokio::net::UdpSocket::bind("0.0.0.0:9091").await.unwrap();
        let remote_addr = "172.30.89.169:9091".parse::<SocketAddr>().unwrap();
        // let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
        let _ = sock.connect(remote_addr).await;
        // sock.send(message).await.unwrap();

        let _udp_client = UdpClientActor::start(sock, remote_addr);
    };
    actix_rt::Arbiter::new().spawn(fut);
    // system.block_on(fut);
    system.run().unwrap();
}

If I remove the comments on

let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";

and

sock.send(message).await.unwrap();

I can at least check that the server can actually receive messages. So I know that the problem must lie in my implementation of the actor. I do have another one which uses the LinesCodec instead of the BytesCodec, which follows the exact same implementation. Only difference is that SinkWrite becomes this:

SinkWrite<(String, SocketAddr), SplitSink<UdpFramed<codec::LinesCodec>,
        (String, SocketAddr)>>

Here is my Cargo.toml for reference.

[package]
name = "local_websocket_client"
version = "0.1.0"
edition = "2018"

[dependencies]
actix="0.12"
actix-codec = "0.4"
actix-rt = "2.5"
bytestring = "1.0"
serde = {version="1.0", features=["serde_derive"]}
log = "0.4"
env_logger = "0.9.0"
chrono = "0.4"
dashmap = "4.0"
futures = "0.3"
openssl = "0.10"
tokio = { version = "1", features = ["full"] }
actix-web = "4.0.0-beta.15"
futures-util = "0.3"
tokio-util = { version="0.6", features=["net", "codec"] }
tokio-udp = "0.1.6"
bytes= { version="0.6", features=["serde"] }
[dependencies.awc]
features = ["openssl"]
version = "3.0.0-beta.9"

[dependencies.serde_json]
features = ["default"]
version = "1.0"

[dependencies.uuid]
features = ["v4", "serde", "v5"]
version = "0.8"

There are some extra crates there because I'm running 2 other websocket clients on the same application.

I would really appreciate some help on this matter. Thank you


Solution

  • Solved it by wrapping the UdpSocket in an Arc and keeping the reference in the actor for later use. Using the socket to write messages works. The split stream used for the streamhandler needs no change, as it works as expected.