I am currently learning how to use apache-storm
and apache-kafka
. I've tried to solve the problem by looking for other dependencies to add in the pom, but all the solutions that I found were related to apache-spark-streaming
. You can find below the code
and the pom
.
the code
:
package analytics;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.kafka.clients.consumer.ConsumerConfig;
public class App
{
public static void main( String[] args )throws Exception, AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> spoutConfigBuilder = KafkaSpoutConfig.builder("localhost:9092", "velib-stations");
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "city-stats");
spoutConfigBuilder.setProp(prop);
KafkaSpoutConfig<String, String> spoutConfig = spoutConfigBuilder.build();
builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
builder.setBolt("station-parsing", new StationParsingBolt())
.shuffleGrouping("stations");
builder.setBolt("city-stats", new CityStatsBolt().withTumblingWindow(BaseWindowedBolt.Duration.of(1000*60*5)))
.fieldsGrouping("station-parsing", new Fields("city"));
builder.setBolt("save-results", new SaveResultsBolt())
.fieldsGrouping("city-stats", new Fields("city"));
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(60*30);
String topologyName = "velos";
if(args.length > 0 && args[0].equals("remote")) {
StormSubmitter.submitTopology(topologyName, config, topology);
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
}
}
}
the pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>analytics</groupId>
<artifactId>analytics</artifactId> <version>1.0-SNAPSHOT</version>
<name>analytics</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<storm.version>2.4.0</storm.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>2.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
It exists when you remove the starter import
$ jar tf target/analytics-1.0-SNAPSHOT-jar-with-dependencies.jar | grep kafka | grep StringDeserializer
org/apache/kafka/common/serialization/StringDeserializer.class
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<storm.version>2.4.0</storm.version>
<storm.kafka.client.version>3.2.0</storm.kafka.client.version>
</properties>
<dependencies>
<!-- provided -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
<!-- compile -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>