I'm trying to hook-up an ApacheSpark Structured Stream to a MQTT topic (IBM Watson IoT Platform on IBM Bluemix in this case).
I'm creating the structured stream as follows:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(aWuFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
So far so good, in REPL I get back this df object as follows:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
I've learned from this thread that I have to change the client ID every time I connect. So this is solved, but if I start to read from the stream using this line:
val query = df.writeStream. outputMode("append").
format("console").start()
Then the resulting schema looks like this:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
And the data as follows:
This means my JSON stream is converted into a stream of string object containing the JSON representation.
Is this a limitation of ApacheBahir?
Also providing a schema doesn't help since the following code resembles into the same result:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true)::
Nil)
:paste
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(a8GFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
Many DataSources
, including, but not limited to MQTTStreamSource
, have fixed schema, which consist of a message and a timestamp. Schema is not lost, is simply not parsed and it is an expected behavior.
If schema is fixed and known up front you should be able to use from_json
function:
import org.apache.spark.sql.functions.from_json
df.withColumn("value", from_json($"value", schema))