I'm new on apache storm and kafka and try to learn these notions via courses provided by OpenClassroom.The principle is simple, messages are sent via a python program to a kafka server, and are retrieved via a kafka spout defined in the main class of a Storm topology. The problem is that I don't understand how the bolt retrieves the messages. From what I understand this is done in the ParsingBolt
class with the following line of code: JSONObject obj = (JSONObject)jsonParser.parse(input.getStringByField("value"));
. The only thing is that I don't understand how we know that the messages are contained in the value
field. Below you can find the main
class and the parsing bolt
class. (The whole project is available here)
The main
class:
package velos;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
public class App {
public static void main(String[] args)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> spoutConfigBuilder = KafkaSpoutConfig.builder("localhost:9092",
"velib-stations");
spoutConfigBuilder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "city-stats");
KafkaSpoutConfig<String, String> spoutConfig = spoutConfigBuilder.build();
builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
builder.setBolt("station-parsing", new StationParsingBolt()).shuffleGrouping("stations");
builder.setBolt("city-stats",
new CityStatsBolt().withTumblingWindow(BaseWindowedBolt.Duration.of(1000 * 60 * 5)))
.fieldsGrouping("station-parsing", new Fields("city"));
builder.setBolt("save-results", new SaveResultsBolt()).fieldsGrouping("city-stats", new Fields("city"));
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(60 * 30);
String topologyName = "Velos";
if (args.length > 0 && args[0].equals("remote")) {
StormSubmitter.submitTopology(topologyName, config, topology);
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
}
}
}
The ParsingBolt
:
package velos;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.shade.org.json.simple.JSONObject;
import org.apache.storm.shade.org.json.simple.parser.JSONParser;
import org.apache.storm.shade.org.json.simple.parser.ParseException;
public class StationParsingBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
outputCollector = collector;
}
@Override
public void execute(Tuple input) {
try {
process(input);
} catch (ParseException e) {
e.printStackTrace();
outputCollector.fail(input);
}
}
public void process(Tuple input) throws ParseException {
JSONParser jsonParser = new JSONParser();
JSONObject obj = (JSONObject)jsonParser.parse(input.getStringByField("value"));
String contract = (String)obj.get("contract_name");
Long availableStands = (Long)obj.get("available_bike_stands");
Long stationNumber = (Long)obj.get("number");
outputCollector.emit(new Values(contract, stationNumber, availableStands));
outputCollector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("city", "station_id", "available_stands"));
}
}
By default the "topic", "partition", "offset", "key", and "value" will be emitted to the "default" stream.
https://storm.apache.org/releases/2.4.0/storm-kafka-client.html
Use a RecordTranslator to change this.