In addition to this question I've create this example to integrate the DataStreamAPI
and the TableAPI
and this time I has no error, and I have two jobs instead of one, one is created for the DataStreamAPI
which is running perfect, and the other job is created for the TableAPI
which is running perfect too, but the only issue is that never receive any value from the DataStreamAPI
, example:
/*FILTERING NULL IDs*/
final SingleOutputStreamOperator<Event> stream_filtered = eventsStream
.filter(new NullidEventsFilterFunction())
.uid("id_filter_operator")
.name("Event Filter");
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
Table source = fsTableEnv.fromDataStream(toTable);
source.execute(); /*without this line the TableAPI job is not started, but nothing happens if is not there either*/
DataStream<String> finalRes = fsTableEnv.toAppendStream(source, String.class);
finalRes.map((MapFunction<String, String>) value -> value)
.name("Mapping after table")
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
}).name("Sink after map from table");
/*STARTING TRANSFORMATIONS*/
Init.init(stream_filtered);
env.execute(job_name);
by doing that I can see this line in the logger:
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5], fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from DEPLOYING to RUNNING.
but no record is received or sent out.
See the image for the DataStream
job
and see the image for the TableAPI
job
Any idea? Thanks in advance. Kind regards!
If you want to write one job that starts and ends with the DataStream API, and uses the Table API in the middle, then here's a simple example you can build upon.
Note that the details involved have changed some from release to release, and this particular example works as written with Flink 1.11. FLIP-136: Improve interoperability between DataStream and Table API is being worked on to make this even easier.
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class BackAndForth {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Tuple2<String, Long>> rawInput = env.fromElements(
new Tuple2<>("u2", 0L),
new Tuple2<>("u1", 5L),
new Tuple2<>("u2", 1L),
new Tuple2<>("u3", 1L),
new Tuple2<>("u1", 0L),
new Tuple2<>("u1", 3L),
new Tuple2<>("u2", 2L));
Table events = tableEnv.fromDataStream(rawInput, $("userId"), $("value"));
Table results = events
.select($("userId"), $("value"))
.where($("value").isGreater(0));
tableEnv
.toAppendStream(results, Row.class)
.print();
env.execute();
}
}
You may be concerned that in the web UI it is showing "Records Sent: 0" and "Records Received: 0". This is very misleading. These Flink metrics only measure records and bytes flowing within Flink, and do not report any i/o with external systems. These metrics also do not report records and bytes flowing between operators that are chained together. Everything in those two jobs is chained, so records/bytes sent/received will always be zero in this case.