0

I have a job with DataStream API, which is running fine, but I need to use the DataStream<Event> resulting from a computation and pass it to the TableAPI to call a register python function and then pass the result back to a new DataStream to reprocess the result of that call. I have two issues here, one is that I can run the job like this:

/*DataStream Job*/
StreamExecutionEnvironment env = EnvironmentConfiguration.getEnv();
final DataStream<Event> eventsStream = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RabbitMQConsumer())
                .uid("cep.objects_mapper_id")
                .name("Event Mapper")
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamp) -> event.timestamp.getTime()))
                .name("Watermarks Added");

/*TableAPI job*/
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
                    fsTableEnv.getConfig().getConfiguration().setString("python.files", "test.py");
                    fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "python.exe");
                    fsTableEnv.getConfig().getConfiguration().setString("python.executable", "python.exe");
                    fsTableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");

SingleOutputStreamOperator<String> stream = eventsStream.map(x -> x.name);

Table source = fsTableEnv.fromDataStream(stream).as("name");
Table result = source.select("func1(name)");

DataStream<String> finalRes = fsTableEnv.toAppendStream(result, String.class);
finalRes.addSink(new SinkFunction<String>() {
                        @Override
                        public void invoke(String value) {
                            LOG.info("Record from table: " + value);
                        }
                    });

env.execute(job_name);

with this example I had no issues at all but the python function is never returning, I'm afraid that it will never be called until I do result.exeute();, Then when I if I apply the same example from above and after

finalRes.addSink(new SinkFunction<String>() {
                        @Override
                        public void invoke(String value) {
                            LOG.info("Record from table: " + value);
                        }
                    });

do result.execute(); to execute the table, the python function works, but the DataStreamAPI job never get to be executed until the the TableAPI has been finished, but as the DataStreamAPI job is never initializated the consumer doesn't work and in consequence the stream that should be sent to the TableAPI and then to the python function is always empty.

My question is: there is anyhow to run both jobs in parallel or run one after the other one? Note: I'd create a TimerTask to wait some time after the DataStreamAPI job was started and then start the TableAPI job (with parallelism 1) and it seems to work but the TableAPI job is created and stopped to many times.

Is there any better way to do this? Hoping someone understand my question.

Thanks!

David Anderson
  • 39,434
  • 4
  • 33
  • 60
Alter
  • 903
  • 1
  • 11
  • 27
  • Which version of Flink is this, and which Table/SQL planner are you using? – David Anderson Aug 28 '20 at 19:11
  • Hi, I'm using Flink 1.11, scala version 2.11 and blink planner. See pom.xml dependencies: ` org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} org.apache.flink flink-table-planner-blink_2.11 ${flink.version} ` – Alter Aug 29 '20 at 20:39
  • Full example of this integration here: https://stackoverflow.com/questions/62725535/using-python-user-defined-function-in-a-java-flink-job – Alter Oct 14 '20 at 13:17

0 Answers0