-1

I am trying to read data from the database, and run process on each object concurrently.

My config as below,

@Bean
public Job job() {
    return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer()).listener(new Listener(videoDao))
            .flow(step1()).end().build();
}

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<VideosDTO, VideosDTO>chunk(3)
            .reader(databaseVideoItemReader(null))
            .processor(new Processor())
            .writer(new Writer(videoDao))
            .build();
}



 @Bean
 @StepScope
ItemReader<VideosDTO> databaseVideoItemReader(@Value("#{jobParameters[userId]}") String userId) {
    logger.info("Fetching videos for userId:"+userId);
    JdbcCursorItemReader<VideosDTO> databaseReader = new JdbcCursorItemReader<>();
    databaseReader.setDataSource(dataSource);
    databaseReader.setSql("SELECT * FROM voc.t_videos where user_id="+userId+"AND job_success_ind='N'");
    databaseReader.setRowMapper(new BeanPropertyRowMapper<>(VideosDTO.class));
   // databaseReader.open(new ExecutionContext());
   ExecutionContext executionContext= new ExecutionContext();
   executionContext.size();
   databaseReader.open(executionContext);

    return databaseReader;
}

My item process is as below,

@Override
public VideosDTO process(VideosDTO videosDTO) throws Exception {
    log.info("processing........" + videosDTO.getVideoUrl());

    try {
        Process p = Runtime.getRuntime()
                .exec("C:\\Program Files\\Git\\bin\\bash.exe " + "D:\\DRM\\script.sh " + videosDTO.getVideoUrl());
        // .exec("D:\\PortableGit\\bin\\bash.exe
        // D:\\Vocabimate_Files\\script.sh "+videosDTO.getVideoUrl());
        // Thread.sleep(1000);
        Thread.sleep(1000);
        p.destroy();
        try {
            p.waitFor();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        try (InputStream is = p.getErrorStream()) {
            int in = -1;
            while ((in = is.read()) != -1) {
                System.out.print((char) in);
            }
        }
        try (InputStream is = p.getInputStream()) {
            int in = -1;
            while ((in = is.read()) != -1) {
                System.out.print((char) in);
            }
        }
    } catch (IOException e2) {
        // TODO Auto-generated catch block
        e2.printStackTrace();
    }

    return videosDTO;
}

writer is as below:

    @Override
public void write(List<? extends VideosDTO>videosList) throws Exception {

    for(VideosDTO vid:videosList){
        log.info("writting...."+vid.getVideoUrl());
    }

}

Suppose if there are 3 Objects fetched from DB this code first complete process on first object,than second and than third than starts writing.I want to Run process on the three object concurrently at same time,than perform writing operation.

Is there any way to do this?

2 Answers2

1

Without getting into the details of your custom Reader/Processor/Writer, I think what you're looking for is a multi-threaded Step.

As also described in the above linked documentation in order to make your step multi-threaded (meaning reading/processing/writing each chunk in a separate thread) you first need to register a SimpleAsyncTaskExecutor:

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("myAsyncTaskExecutor");
}

and then register this task executor in your Step's builder:

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<VideosDTO, VideosDTO>chunk(3)
            .reader(databaseVideoItemReader(null))
            .processor(new Processor())
            .writer(new Writer(videoDao))
            //making the Step multi-threaded
            .taskExecutor(taskExecutor())
            .build();
}
dimitrisli
  • 20,895
  • 12
  • 59
  • 63
1

Using a multi-threaded step like suggested by @dimitrisli is the way to go. In addition to that, another way is to use the AsyncItemProcessor (in combination with an AsyncItemWriter).

A similar use case (calling a rest endpoint asynchronously from the processor) can be found here: https://stackoverflow.com/a/52309260/5019386 where I gave some more details.

Hope this helps.

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50