I have just started working on Spark Structured Streaming and came up with an implementation question.
So I am using Apache Pulsar to stream data, and wanted to know if it is possible to run different queries in the same program, and either have the results joined, feed the result of one query to the other without putting the result into another topic or sink.
To give an example, for schema,
root
|-- amount: long (nullable = true)
|-- cardNum: string (nullable = true)
|-- category: string (nullable = true)
|-- merchant: string (nullable = true)
|-- ts: long (nullable = true)
|-- userId: string (nullable = true)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|-- __messageProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
The processor code
public class CardTransactionStreamProcessor {
public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
public static final String TOPIC = "spark/tutorial/card-txn";
public static final String SUB = "my-sub";
public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {
SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");
sparkSession.sparkContext().setLogLevel("error");
Dataset<Row> lines = sparkSession.readStream()
.format("pulsar")
.option("service.url", PULSAR_SERVICE_URL)
.option("topic", TOPIC)
.option("startingOffsets", "earliest")
.load();
lines.printSchema();
Dataset<CardTransactionDTO> cardTransactionDTODataset = lines.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));
cardTransactionDTODataset.printSchema();
// Top spends merchant vise
cardTransactionDTODataset.groupBy("merchant")
.agg(sum("amount").alias("amount"))
.sort("merchant")
.writeStream().outputMode("complete")
.format("console")
.start();
// Top spends category vise
cardTransactionDTODataset.groupBy("category")
.agg(sum("amount").alias("amount"))
.sort("category")
.writeStream().outputMode("complete")
.format("console").start();
sparkSession.streams().awaitAnyTermination();
}
}
In the above sample, I'd like to understand, either how to take the output of first query and feed it to the second query, or join the results of both the queries forming a single DataFrame, that I may take to sink, say pulsar topic itself.
Yes, it's possible using Dataset<T>
.
If you want to combine the output of two queries which are in the Dataset<T>
, you can combine using union
method present in the Dataset.
Try this updated code:
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import static org.apache.spark.sql.functions.sum;
import ....
public class CardTransactionStreamProcessor {
public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
public static final String TOPIC = "spark/tutorial/card-txn";
public static final String SUB = "my-sub";
public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {
SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");
sparkSession.sparkContext().setLogLevel("error");
Dataset<Row> lines = sparkSession.readStream().format("pulsar").option("service.url", PULSAR_SERVICE_URL)
.option("topic", TOPIC).option("startingOffsets", "earliest").load();
lines.printSchema();
// Top spends merchant vise
Dataset<Row> spendsByMerchantWise = lines.groupBy("merchant").agg(sum("amount").alias("amount"))
.sort("merchant");
// Top spends category vise
Dataset<Row> spendsByCatgoryWise = lines.groupBy("category").agg(sum("amount").alias("amount"))
.sort("category");
// union of rows from both Data sets.
Dataset<Row> union = spendsByMerchantWise.union(spendsByCatgoryWise);
// convert to Dataset<CardTransactionDTO>
Dataset<CardTransactionDTO> cardTransactionDTODataset = union
.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));
cardTransactionDTODataset.writeStream().outputMode("complete").format("console").start();
sparkSession.streams().awaitAnyTermination();
}
}