apache-flinkflink-streamingpyflink

Usecases for Datastream API in PyFlink


I use Flink in Python to process streamed data from Kafka. However, in all scripts where the Datastream API is used instead of the Table API, I can see raw Python code being implemented, cf this example from the official tutorial here

def split(line):
    yield from line.split()

# compute word count
ds = ds.flat_map(split) \
       .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
       .key_by(lambda i: i[0]) \
       .reduce(lambda i, j: (i[0], i[1] + j[1]))

In which use cases should the datastream API be used over the table API? Due to performance differences (crossing java/python barrier, optimized execution of predefined functions, ...; cf. this question), I would only use the Datastream API if there is absolutely no way of implementing my use case in the Table API. Am I misunderstanding something here?


Solution

  • In which use cases should the datastream API be used over the table API? ... I would only use the Datastream API if there is absolutely no way of implementing my use case in the Table API. Am I misunderstanding something here?

    I would say in general the guidance provided in David's answer in your shared post is correct. The support for Python within Flink is pretty anemic compared to its more fully-featured Java counterpart, and I think this is true to the Table API as a whole compared to the Datastream API, flexibility-wise.

    Performance-wise, if you can do something that makes sense and can be implemented in the Table API, I'd recommend doing so. This is great for most simple ETL type operations. However, if you require more complex operations, interactions with state, and in general implementations that necessitate the use of a higher-level language (e.g. Java, Python, etc.), the DataStream API is going to be a far more flexible choice.

    It's also worth noting that recent releases of Flink allow interop between the use of the Table and Datastream APIs, such that you could take advantage of related Table API calls within a Datastream application. I'd recommend checking out the DataStream API Integration docs for a bit more information on this.