I have a topology that's receiving data from a MQTT broker, and I want a spout to behave like this:
Emit a batch of tuples (or a list of strings in a single tuple) every x seconds. How do I achieve this? I read a bit about Storm Trident but its IBatchSpout
doesn't seem to allow me to emit tuples in batch with a specific time interval.
What should the spout do if there's no new data coming in? It can't block the thread since it's Storm's main thread, right?
You could implement your own MQTT spout. For an example have a look at the MongoSpout.
The important part is the nextTuple
method.
When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be non-blocking, so if the Spout has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short amount of time (like a single millisecond) so as not to waste too much CPU.
You must not wait the specified time at once, but you could implement nextTuple
so that it only emits a tuple once in a while.
private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;
@Override
public void nextTuple() {
if (lastEmission == null ||
lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
List<Object> tuple = pollMQTT();
if (tuple != null) {
this.collector.emit(tuple);
return;
}
}
Utils.sleep(50);
}
Note that I've found an open source MQTT spout. It doesn't look production ready, but you could use it as a starting point.