In addition to this question I've create this example to integrate the DataStreamAPI
and the TableAPI
and this time I has no error, and I have two jobs instead of one, one is created for the DataStreamAPI
which is running perfect, and the other job is created for the TableAPI
which is running perfect too, but the only issue is that never receive any value from the DataStreamAPI
, example:
/*FILTERING NULL IDs*/
final SingleOutputStreamOperator<Event> stream_filtered = eventsStream
.filter(new NullidEventsFilterFunction())
.uid("id_filter_operator")
.name("Event Filter");
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
Table source = fsTableEnv.fromDataStream(toTable);
source.execute(); /*without this line the TableAPI job is not started, but nothing happens if is not there either*/
DataStream<String> finalRes = fsTableEnv.toAppendStream(source, String.class);
finalRes.map((MapFunction<String, String>) value -> value)
.name("Mapping after table")
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
}).name("Sink after map from table");
/*STARTING TRANSFORMATIONS*/
Init.init(stream_filtered);
env.execute(job_name);
by doing that I can see this line in the logger:
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5], fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from DEPLOYING to RUNNING.
but no record is received or sent out.
See the image for the DataStream
job
and see the image for the TableAPI
job
Any idea? Thanks in advance. Kind regards!