apache-kafkaapache-flink

Error in Flink process Kafka topic:java.net.ConnectException: Connection refused (Connection refused)


I have a Flink program which must be connected to Kafka topic as a consumer and process it. I already set Kafka server.properties:

  broker.id=0
  listeners=PLAINTEXT://localhost:9092
  log.dirs=/tmp/kafka-logs
  zookeeper.connect=localhost:2181
  num.network.threads=3
  num.io.threads=8
  socket.send.buffer.bytes=102400
  socket.receive.buffer.bytes=102400
  socket.request.max.bytes=104857600
  log.retention.hours=168
  log.segment.bytes=1073741824
  log.retention.check.interval.ms=300000
  log.cleaner.enable=true

Then first I run Zookeeper and then run Kafka server. They are run without any error. I check that 9092 is open:

 nc -zv localhost 9092

The answer is :

 Connection to localhost (127.0.0.1) 9092 port [tcp/*] succeeded!

Moreover, in the Flink program I have these dependencies:

 <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.12.3</version>
    </dependency>

The program is in the following:

  import com.fasterxml.jackson.databind.JsonNode;
  import com.fasterxml.jackson.databind.ObjectMapper;
  import org.apache.flink.api.common.functions.AggregateFunction;
  import org.apache.flink.api.java.tuple.Tuple2;
  import org.apache.flink.streaming.api.datastream.DataStream;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.api.windowing.time.Time;
  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  import org.apache.flink.api.common.serialization.SimpleStringSchema;
  import java.util.Properties;
  import org.apache.flink.api.common.functions.FlatMapFunction;
  import org.apache.flink.util.Collector;
  public class WordCount {
   public static void main(String[] args) throws Exception {
    // Set up the streaming execution environment
    final StreamExecutionEnvironment env = 
   StreamExecutionEnvironment.getExecutionEnvironment();
    // Create a DataStream from a source
    DataStream<String> text = env.socketTextStream("localhost", 9999);
    // Process the stream
    DataStream<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .sum(1);
    // Print the results to the standard output
    wordCounts.print();
    // Execute the job
    env.execute("Flink Streaming Word Count");
    // Kafka properties
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");
    // Create a Kafka consumer
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
            "test",
            new SimpleStringSchema(),
            properties
    );
    // Add the consumer as a source
    DataStream<String> stream = env.addSource(consumer);
    // Parse the JSON transaction records and extract transaction amounts
    DataStream<Double> transactionAmounts = stream.map(record -> {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode jsonNode = objectMapper.readTree(record);
        return jsonNode.get("tranamount").asDouble();
    });
    // Calculate sum, count, and average over a sliding window of 1 minute
    DataStream<Tuple2<String, Double>> result = transactionAmounts
            .timeWindowAll(Time.minutes(1))
            .aggregate(new AggregateFunction<Double, Tuple2<Double, Integer>, 
    Tuple2<String, Double>>() {
                @Override
                public Tuple2<Double, Integer> createAccumulator() {
                    return new Tuple2<>(0.0, 0);
                }
                @Override
                public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
                    return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
                }

                @Override
                public Tuple2<String, Double> getResult(Tuple2<Double, Integer> accumulator) {
                    double sum = accumulator.f0;
                    int count = accumulator.f1;
                    double average = sum / count;
                    return new Tuple2<>("Sum: " + sum + ", Count: " + count + ", Avg: " + average, average);
                }

                @Override
                public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
                    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
                }
            });

    // Print the result to the console
    result.print();

    // Execute the Flink job
    env.execute("Flink Kafka JSON Consumer Example");

   } 

  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // Normalize and split the line into words
        String[] tokens = value.toLowerCase().split("\\W+");
        // Emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
   }
  }

Every things seems normal but when I run the program I receive this error:

  Caused by: java.net.ConnectException: Connection refused (Connection refused)

Would you please guide me how to solve the problem?

Any help is really appreciated.


Solution

  • Problem solved. Issue was because of conflict between Flink version and Kafka version. The suitable version is in the following:

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>