What is the best way to compare received data in Spark Streaming to existing data in HBase?
We receive data from kafka as DStream, and before writing it down to HBase we must scan HBase for data based on received keys from kafka, do some calculation (based on new vs old data per key) and then write down to HBase.
So if I receive record (key, value_new), I must get from HBase (key, value_old), so I can compare value_new vs value_old.
So the logic would be:
Dstream from Kafka -> Query HBase by DStream keys -> Some calculations -> Write to HBase
My "naïve" approach was to use Phoenix Spark Connector to read and left join to new data based on key as a way to filter out keys not in the current micro-batch. So I would get a DF with (key, value_new, value_old) and from here I can compare inside partition.
JavaInputDStream<ConsumerRecord<String, String>> kafkaDStream = KafkaUtils.createDirectStream(...);
// use foreachRDD in order to use Phoenix DF API
kafkaDStream.foreachRDD((rdd, time) -> {
// Get the singleton instance of SparkSession
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaPairRDD<String, String> keyValueRdd = rdd.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
// TO SLOW FROM HERE
Dataset<Row> oldDataDF = spark
.read()
.format("org.apache.phoenix.spark")
.option("table", PHOENIX_TABLE)
.option("zkUrl", PHOENIX_ZK)
.load()
.withColumnRenamed("JSON", "JSON_OLD")
.withColumnRenamed("KEY_ROW", "KEY_OLD");
Dataset<Row> newDF = toPhoenixTableDF(spark, keyValueRdd); //just a helper method to get RDD to DF (see note bellow)
Dataset<Row> newAndOld = newDF.join(oldDataDF, oldDataDF.col("KEY_OLD").equalTo(newDF.col("KEY_ROW")), "left");
/// do some calcs based on new vs old values and then write to Hbase ...
});
PROBLEM: getting data from HBase based on a list of keys from received DStream RDD using the above approach is too slow for streaming.
What can be a performant way to do so?
Side note: Method toPhoenixTableDF is just a helper to transform the received RDD to DF:
private static Dataset<Row> toPhoenixTableDF(SparkSession spark, JavaPairRDD<String, String> keyValueRdd) {
JavaRDD<phoenixTableRecord> tmp = keyValueRdd.map(x -> {
phoenixTableRecord record = new phoenixTableRecord();
record.setKEY_ROW(x._1);
record.setJSON(x._2);
return record;
});
return spark.createDataFrame(tmp, phoenixTableRecord.class);
}
The solution is to use the spark hbase connector for batch get and put.
You can find the source code here with good examples. https://github.com/apache/hbase-connectors/tree/master/spark As well as in HBase documentation (spark session).
This library uses plain Java/Scala Hbase api, so you have control over operations, but manages for you the connection pool through an hbaseContext object broadcasted to executors, which is really great. It provides simple wrappers for Hbase operations, but, if needed, we can just use its foreach/mapPartition and gain control over logic, while having access to a managed connection.