I am trying AWS IoT Core and it seems when you publish message to a topic - even though I select AtMostOnce
in QoS - it delivers the same message to all devices who listens this topic. Is it possible to deliver only one instance who subscribes this topic? I am more looking like an SQS similar use in here. Here is the PoC I written in rust and the output:
async fn run_client(client_mailbox: String,aws_settings: AWSIoTSettings, subscribe_topic: String) -> Result<AWSIoTAsyncClient, Box<dyn Error>> {
let (iot_core_client, eventloop_stuff) = AWSIoTAsyncClient::new(aws_settings).await?;
iot_core_client.subscribe(subscribe_topic.clone(), QoS::AtMostOnce).await.unwrap();
let mut receiver = iot_core_client.get_receiver().await;
// Spawn a task to handle incoming messages
task::spawn(async move {
loop {
if let Ok(event) = receiver.recv().await {
match event {
Packet::Publish(p) => {
println!("Received message. destination topic:{:?}, receiver mailbox: {:?}, message: {:?}", p.topic, client_mailbox, p.payload)
}
_ => (),
}
}
}
});
// Spawn a task to handle the event loop
task::spawn(async move {
async_event_loop_listener(eventloop_stuff).await.unwrap();
});
Ok(iot_core_client)
}
Here is the test code I have written:
println!("Connecting mailboxes");
let client1 = run_client("client1".to_string(), client1_settings, "client1".to_string()).await?;
let service1 = run_client("service1".to_string(), service1_settings, "service".to_string()).await?;
let service2 = run_client("service2".to_string(), service2_settings, "service2".to_string()).await?;
std::thread::sleep(Duration::from_secs(2));
println!("\nClient sending message to backend");
client1.publish("service".to_string(), QoS::AtMostOnce, "Hello from client1".to_string()).await?;
Here is the output I get:
Clients sending message to backend
Received message. destination topic:"service", receiver mailbox: "service1", message: b"Hello from client1"
Received message. destination topic:"service", receiver mailbox: "service2", message: b"Hello from client1"
I want only one service instance should get the message and not the other since it is already delivered to one of my backend service instances. How can I do that?
MQTT delivers messages to ALL subscribers to a given topic under normal circumstances.
As part of MQTTv5 the concept of shared subscriptions was added to the specification, this is where a group of clients can be defined and any given message will only be delivered to one of that group. This is done by prefixing the topic with $share/[group-name]/
e.g. for a topic foo/bar
and a group of test
the topic becomes $share/test/foo/bar
This is an optional feature in the specification and the broker needs to support it for it to be used.