I'm using nats-spark-connector ( https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced ) to connect to NATS Jetstream and consume the message and process using Spark Java code. Below is the code snippet
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
// .config("spark.logConf", "false")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
)
// .config("spark.executor.instances", "2")
// .config("spark.cores.max", "4")
// .config("spark.executor.memory", "2g")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "my_stream3")
.option("nats.stream.subjects", "mysub3")
// wait 90 seconds for an ack before resending a message
.option("nats.msg.ack.wait.secs", 90)
//.option("nats.num.listeners", 2)
// Each listener will fetch 10 messages at a time
// .option("nats.msg.fetch.batch.size", 10)
.load();
System.out.println("Successfully read nats stream !");
StreamingQuery query;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.option("truncate", false)
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
As per nats-io jetstream guide ( https://docs.nats.io/nats-concepts/jetstream/js_walkthrough) using below command to publish messages to the stream (subject name : mysub3)
nats pub foo --count=1000 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}"
After publishing the messages to nats stream, output of the code is :
Successfully read nats stream !
Status change nats: connection opened
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+
-------------------------------------------
Batch: 2
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 4
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 5
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
+-------+---------------------------+------------------------------------------+
-------------------------------------------
Batch: 6
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime |content |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
|mysub3 |10/31/2023 - 18:04:36 +0530|publication #6 @ 2023-10-31T18:04:36+05:30|
|mysub3 |10/31/2023 - 18:04:37 +0530|publication #7 @ 2023-10-31T18:04:37+05:30|
+-------+---------------------------+------------------------------------------+
And it continues to print the same set of messages for every batch. Since I'm using outputMode("append")
I was expecting only newly published messages should get printed for every batch. But all messages including messages which got printed in previous batch is included in the subsequent batches . I tried with outputMode("update")
as well. It's giving same output as append.
Can you please help me how to make sure each batch prints/receives only newly pushed messages ?
Found out a solution for this scenario ! Messages were getting duplicated in every batches because there was no acknowledgment to NATS for the messages after consuming it. Had to use a durable consumer and pass the consumer name as an option in the spark code.
Instructions to create a durable consumer : https://docs.nats.io/nats-concepts/jetstream/js_walkthrough#3.-creating-a-consumer
Info related to durable consumer : https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced
"nats.durable.name" Durable subscriptions allow clients to assign a durable name to a subscription when it is created. Doing this causes the NATS Streaming server to track the last acknowledged message for that clientID + durable name, so that only messages since the last acknowledged message will be delivered to the client. Obligatory configuration.
Pass this consumer name as an option by using key "nats.durable.name":
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "my_stream3")
.option("nats.stream.subjects", "mysub3")
// wait 90 seconds for an ack before resending a message
.option("nats.msg.ack.wait.secs", 90)
//.option("nats.num.listeners", 2)
// Each listener will fetch 10 messages at a time
// .option("nats.msg.fetch.batch.size", 10)
.option("nats.durable.name", "my_consumer")
.load();
This will make sure that only messages since the last acknowledged message will be delivered to the client.