pythonapache-flinkpyflink

Window_all throwing an error with Pyflink Kafka Connector


I am trying to print the datastream by applying the tumbling process window for every 5 seconds. Since I couldn't implement the custom deserializer for now, I created the process function which returns the result as tuple, and as per this documentation link I could link the process function with the windowing operation so I tried this out:

def get_data(self):
        source = self.__get_kafka_source()
        ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source").window_all(
            TumblingProcessingTimeWindows.of(Time.seconds(5)))
        ds.process(ExtractingRecordAttributes(),
                   output_type=Types.TUPLE(
                       [Types.STRING(), Types.STRING(),
                        Types.STRING()])).print()
        self.env.execute("source")

    def __get_kafka_source(self):
        source = KafkaSource.builder() \
            .set_bootstrap_servers("localhost:9092") \
            .set_topics("test-topic1") \
            .set_group_id("my-group") \
            .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
            .set_value_only_deserializer(SimpleStringSchema()) \
            .build()
        return source

class ExtractingRecordAttributes(KeyedProcessFunction):

    def __init__(self):
        pass

    def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
        parts = UserData(*ast.literal_eval(value))
        result = (parts.user, parts.rank, str(ctx.timestamp()))
        yield result

    def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
        yield "On timer timestamp: " + str(timestamp)

When I trigger the get_data method, it gives me the below error:

 return self._wrapped_function.process(self._internal_context, input_data)
AttributeError: 'ExtractingRecordAttributes' object has no attribute 'process'

    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61)
    at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
    ... 7 more  

If I don't use window_all, everything works fine. But the moment I introduce it, it fails. What am I doing wrong here? Any hints would be helpful.

I am using Pyflink 1.17.1

TIA.


Solution

  • The problem is for windowing you need to use ProcessWindowFunction and not ProcessFunction.

    Ideally, your code should look like this:

    class ExtractingRecordAttributes(ProcessWindowFunction):
    
        def __init__(self):
            pass
    
        def process(self, key: str,
                    context: 'ProcessWindowFunction.Context',
                    elements: Iterable[Tuple[str, str]]) -> Iterable[Tuple[str, str, str]]:
            result = ""
            for element in elements:
                parts = UserData(*ast.literal_eval(str(element)))
                result = (parts.user, parts.rank, str(context.current_processing_time()))
                yield result
    
        def clear(self, context: 'ProcessWindowFunction.Context'):
            pass   
    

    I haven't tested this, but I think it should work. Here is the reference from the documentation.