apache-flinkflink-streamingflink-cep

Persist Apache Flink window


I'm trying to use Flink to consume a bounded data from a message queue in a streaming passion. The data will be in the following format:

{"id":-1,"name":"Start"}
{"id":1,"name":"Foo 1"}
{"id":2,"name":"Foo 2"}
{"id":3,"name":"Foo 3"}
{"id":4,"name":"Foo 4"}
{"id":5,"name":"Foo 5"}
...
{"id":-2,"name":"End"}

The start and end of messages can be determined using the event id. I want to receive such batches and store the latest (by overwriting) batch on disk or in memory. I can write a custom window trigger to extract the events using the start and end flags as shown below:

DataStream<Foo> fooDataStream = ...
AllWindowedStream<Foo, GlobalWindow> fooWindow = fooDataStream.windowAll(GlobalWindows.create())
.trigger(new CustomTrigger<>())
.evictor(new Evictor<Foo, GlobalWindow>() {
    @Override
    public void evictBefore(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
        for (Iterator<TimestampedValue<Foo>> iterator = elements.iterator();
             iterator.hasNext(); ) {
            TimestampedValue<Foo> foo = iterator.next();
            if (foo.getValue().getId() < 0) {
                iterator.remove();
            }
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {

    }
});

but how can I persist the output of the latest window. One way would be using a ProcessAllWindowFunction to receive all the events and write them to disk manually but it feels like a hack. I'm also looking into the Table API with Flink CEP Pattern (like this question) but couldn't find a way to clear the Table after each batch to discard the events from the previous batch.


Solution

  • There are a couple of things getting in the way of what you want:

    (1) Flink's window operators produce append streams, rather than update streams. They're not designed to update previously emitted results. CEP also doesn't produce update streams.

    (2) Flink's file system abstraction does not support overwriting files. This is because object stores, like S3, don't support this operation very well.

    I think your options are:

    (1) Rework your job so that it produces an update (changelog) stream. You can do this with toChangelogStream, or by using Table/SQL operations that create update streams, such as GROUP BY (when it's used without a time window). On top of this, you'll need to choose a sink that supports retractions/updates, such as a database.

    (2) Stick to producing an append stream and use something like the FileSink to write the results to a series of rolling files. Then do some scripting outside of Flink to get what you want out of this.