apache-flinkflink-streamingflink-cep

Using AsyncDataStream and RichAsyncFunction with SingleOutputStreamOperator


I'm aggregating over a keyed stream using a SingleOutputStreamOperator object in the form

stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...)

After I obtain the aggregates I would like to send each record to a REST endpoint using a POST request. To do this I've written a RichAsyncFunction. Unfortunately AsyncDataStream.unorderedWait(...) is incompatible with SingleOutputStreamOperator and instead needs a more generic DataStreams object.

The method unorderedWait(DataStream<IN>, AsyncFunction<IN,OUT>, long, TimeUnit, int) in the type AsyncDataStream is not applicable for the arguments (SingleOutputStreamOperator<Tuple6<String,String,Long,Long,Long,Long>>, AsyncFunction<String,String>, long, TimeUnit, int)

How can I use get the data stream from SingleOutputStreamOperator to use the RichAsyncFunction I've created. Or should I just use a process window function instead?


Solution

  • SingleOutputStreamOperator<T> extends DataStream<T>, so I don't think that's your problem.

    It looks like you've defined an AsyncFunction<String,String>, so it expects a String as input, but based on the error message you're passing it Tuple6<String,String,Long,Long,Long,Long>.