I have a Flink program which must be connected to Kafka topic as a consumer and process it.
I already set Kafka server.properties
:
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=true
Then first I run Zookeeper and then run Kafka server. They are run without any error. I check that 9092 is open:
nc -zv localhost 9092
The answer is :
Connection to localhost (127.0.0.1) 9092 port [tcp/*] succeeded!
Moreover, in the Flink program I have these dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
The program is in the following:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Create a DataStream from a source
DataStream<String> text = env.socketTextStream("localhost", 9999);
// Process the stream
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
// Print the results to the standard output
wordCounts.print();
// Execute the job
env.execute("Flink Streaming Word Count");
// Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// Create a Kafka consumer
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"test",
new SimpleStringSchema(),
properties
);
// Add the consumer as a source
DataStream<String> stream = env.addSource(consumer);
// Parse the JSON transaction records and extract transaction amounts
DataStream<Double> transactionAmounts = stream.map(record -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(record);
return jsonNode.get("tranamount").asDouble();
});
// Calculate sum, count, and average over a sliding window of 1 minute
DataStream<Tuple2<String, Double>> result = transactionAmounts
.timeWindowAll(Time.minutes(1))
.aggregate(new AggregateFunction<Double, Tuple2<Double, Integer>,
Tuple2<String, Double>>() {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0, 0);
}
@Override
public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Tuple2<String, Double> getResult(Tuple2<Double, Integer> accumulator) {
double sum = accumulator.f0;
int count = accumulator.f1;
double average = sum / count;
return new Tuple2<>("Sum: " + sum + ", Count: " + count + ", Avg: " + average, average);
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
});
// Print the result to the console
result.print();
// Execute the Flink job
env.execute("Flink Kafka JSON Consumer Example");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Normalize and split the line into words
String[] tokens = value.toLowerCase().split("\\W+");
// Emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Every things seems normal but when I run the program I receive this error:
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Would you please guide me how to solve the problem?
Any help is really appreciated.
Problem solved. Issue was because of conflict between Flink version and Kafka version. The suitable version is in the following:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>