6

I want to perform some tasks after flink job is completed,I am not having any issues when I run code in Intellij but there are isssues when I run Flink jar in a shell file. I am using below line to make sure that execution of flink program is complete

//start the execution

JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");

 is_job_finished = jobExecutionResult.isJobExecutionResult();

I am not sure, if above check is correct or not ?

Then I am using the above varible in below method to perform some tasks

    if(print_mode && is_job_finished){



        System.out.println(" \n \n -- System related  variables  -- \n");

        System.out.println(" Stream_join Window length = " + WindowLength_join__ms + " milliseconds");
        System.out.println(" Input rate for stream RR  = " + input_rate_rr_S + " events/second");
        System.out.println("Stream RR Runtime = " + Stream_RR_RunTime_S + " seconds");
        System.out.println(" # raw events in stream RR  = " + Total_Number_Of_Events_in_RR + "\n");

}

Any suggestions ?

Amarjit Dhillon
  • 2,718
  • 3
  • 23
  • 62

3 Answers3

0

You can register a job listener to execution environment.

For example

env.registerJobListener(new JobListener {
      //Callback on job submission.
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUBMIT SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    //Callback on job execution finished, successfully or unsuccessfully.
      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })
mstzn
  • 2,881
  • 3
  • 25
  • 37
0

Register a JobListener to your StreamExecutionEnvironment.

mrgatto
  • 71
  • 1
  • 4
  • Could you give an example on how to do it ? – Sylvaus Nov 17 '20 at 00:37
  • mstzn includes an example in scala. You can see more at https://github.com/apache/flink/blob/release-1.11/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java – mrgatto Nov 17 '20 at 13:16
  • My comment was to make your answer more helpful, simply directing to documentation may not be enough for the one asking the question to solve their problem. – Sylvaus Nov 17 '20 at 13:20
0

JobListener is grate program if not SQL API.

if use SQL API, onJobExecuted will never be called. I have a idea, you can refer to it. the source is Kafka, sink can use any type. enter image description here

let me explain it :

  • EndSign: follow to last data. when your Flink job consumed it, meaning the partition element rest is empty.

Close loigcal:

  • When you flink job processing EndSign. job need to call JobController, then JobController counter +1
  • Until the JobController counter equals partition count. then JobController will check consumer group lag, ensure Flink job get all data.
  • Now, we know the job is finished