apache-kafkaapache-flinkstream-processing

Apache flink job not printing anything to stdout and not sending to Kafka sink topic


I am experimenting with Apache Flink for a personal project and I have been struggling to make the resulting stream output to StdOut and to send it to a Kafka topic orders-output.

My goal is to calculate the sum of the price field per product in a tumbling time window of 3 minutes. Basically, the Apache flink job, receive orders as JSON formatted string from two Kafka source streams (orders-a and orders-b), it joins them together, get a tuple of the shape (product_name, product_price (double) , after that it group's it by product, apply the tumbling window of 3 minutes and computes the sum of the price per product in that window using a ReduceFunction. Here's the code :


    FlinkKafkaConsumer<String> consumerA = new FlinkKafkaConsumer<>("orders-a", new SimpleStringSchema(), props);
    FlinkKafkaConsumer<String> consumerB = new FlinkKafkaConsumer<>("orders-b", new SimpleStringSchema(), props);

    DataStream<String> streamA = env.addSource(consumerA);
    DataStream<String> streamB = env.addSource(consumerB);

    DataStream<Tuple2<String,Double>> mergedOrders = streamA
            .union(streamB)
            .map(new MapFunction<String, Tuple2<String, Double>>() {
                @Override
                public Tuple2<String, Double> map(String s) throws Exception {
                    return DataHelper.getTuple(s);
                }
            });

    DataStream<Tuple2<String, Double>> totals = mergedOrders
            .keyBy(value -> value.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .reduce(new ReduceFunction<Tuple2<String, Double>>() {
                public Tuple2<String, Double> reduce(Tuple2<String, Double> v1, Tuple2<String, Double> v2) {
                    return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
                }
            });
    DataStream<String> result = totals.map(new MapFunction<Tuple2<String, Double>, String>() {
        @Override
        public String map(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
            LOG.info(stringDoubleTuple2.toString());
            return stringDoubleTuple2.toString();
        }
    });

    result.print();
    result.addSink(new FlinkKafkaProducer<>("orders-output", new SimpleStringSchema(), props));

    env.execute("Product revenues per 3m");

The code does what was previously described, DataHelper is just a custom helper class that I created to help me convert the received orders from JSON string to Tuple2 and other types. The job runs fine once started (I am running everything locally) and I can even see that the data is received in Flink UI (see the image below).

Flink UI Dashboard The issue is that I am not seeing any results in StdOut (both on terminal and in Flink UI) nor in Kafka output topic (I started a consumer of orders-output independently in another terminal and I am not receiving anything).

I would appreciate some help on this as I've been stuck on it for two days.


Solution

  • I won't probably answer your question, but I might help you find what is wrong.

    First of all, FlinkKafkaProducer and FlinkKafkaConsumer are deprecated, use KafkaSink + KafkaSource instead. Second, I do not see specified time strategy used (event or processing) but maybe that does not have to be stated explicitly (not sure I only use event time).

    To the problem: You can clearly see, that data are coming into your last operator, which does windowing, mapping and sinking. If you want to identify which function of those is problematic, you can do custom chaining and map each function to a standalone operator (see https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups).

    I can see, that you added log into last map function before sinking but you can't see anything being logged. If your logging configuration is right, then you know, that data are being stuck in windowing function. The only reason that comes to mind why this would be a problem is because Flink's time doesn't advance, so it doesn't close any windows, so the data isn't processed. You can add more functions into your pipeline, just for debugging reasons, so you can log actual time in your pipeline and see where the data are (or use those custom chains).

    You could add function right after keyby function before window and after window you are already logging elements. This way you could identify, where exactly is point where data arrive but does not proceed further. Than log flink's time. You can try to create some timers and override onTimer method to see if the time move forward.

    Last thing to add - you can check operator's metrics. It is accessible through web UI. Check numRecordsOut (or something like that) for last operator to see if it works. Btw BytesSent will be zero for sinking operator, because it is not "sending" data to next operator, but sinking them