1

Introduction

I am trying to use jobparameters created in a tasklet to create steps following the execution of the tasklet.

A tasklet tries to finds some files (findFiles()) and if it finds some files it saves the filenames to a list of strings.

In the tasklet I pass the data as following: chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);

The next step is a parallel flow where for each file a simple reader-processor-writer step will be executed (if you are interested in how I got there please see my previous question: Spring Batch - Looping a reader/processor/writer step)

Upon building the job readFilesJob() a flow is created initially using a "fake" list of files because only after the tasklet has been executed the real list of files is known.

Question

How do I configure my job so the tasklet gets executed first and then the parallel flow gets executed using the list of files generated from the tasklet?

I think it comes down to getting the list of filenames loaded with the correct data at the correct moment during runtime... but how?

Reproduce

Here is my simplified configuration:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());

        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);

        Flow flow = flowBuilder
                .start(findFiles())             
                .next(createParallelFlow(steps))
                .build();       

        return jobBuilderFactory.get("readFilesJob")                
                .start(flow)                
                .end()
                .build();
    }

    private static Flow createParallelFlow(List<Step> steps){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream()
                .map(step ->
                        new FlowBuilder<Flow>("flow_" + step.getName()) 
                        .start(step) 
                        .build()) 
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) 
             .add(flows.toArray(new Flow[flows.size()]))
             .build();      
    }

    private Step createStep(String fileName){
        return stepBuilderFactory.get("readFile" + fileName)
                .chunk(100)
                .reader(reader(fileName))               
                .writer(writer(filename))                               
                .build();
    }

    private FileFinder findFiles(){
        return new FileFinder();
    }
}

Research

The question and answer from How to safely pass params from Tasklet to step when running parallel jobs suggest the usage of a construct like this in the reader/writer:

@Value("#{jobExecutionContext[filePath]}") String filePath

However, I really hope it is possible to pass the fileName as a string to the reader/writer due to the way the steps are created in the createParallelFlow() method. Therefore, even tho the answer to that question might be a solution for my problem here, it is not the desired solution. But please do not refrain from correcting me if I am wrong.

Closing

I am using the file names example to clarify the problem better. My problem is not actually the reading of multiple files from a directory. My question really boils down to the idea of generating data during runtime and passing it to the next dynamically generated step(s).

EDIT:

Added a simplified tasklet of the fileFinder.

@Component
public class FileFinder implements Tasklet, InitializingBean {

    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames.add("sample-data1.csv");
        fileNames.add("sample-data2.csv");
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // Execution of methods that will find the file names and put them in the list...
        chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);                     
        return RepeatStatus.FINISHED;
    }    
}
Community
  • 1
  • 1
Sander_M
  • 1,109
  • 2
  • 18
  • 36

1 Answers1

2

I'm not sure, if I did understand your problem correctly, but as far as I see, you need to have the list with the filenames before you build your job dynamically.

You could do it like this:

@Component
public class MyJobSetup {
    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames = ....;
    }
}

After that, you can inject this Bean inside your JobConfiguration Bean

@Configuration
@EnableBatchProcessing
@Import(MyJobSetup.class)
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    private  MyJobSetup jobSetup; // <--- Inject
          // PostConstruct of MyJobSetup was executed, when it is injected

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = jobSetUp.getFileNames() // get the list of files
             .stream() // as stream
             .map(file -> createStep(file)) // map...
             .collect(Collectors.toList()); // and create the list of steps
Hansjoerg Wingeier
  • 4,274
  • 4
  • 17
  • 25
  • Again, thank you for your quick answer and help, it is very helpful. The fileFinder is actually a tasklet that executes some methods and when its done and found files it saves the filenames in a list. In my actual application some dynamic SQLs are created that need to be passed to multiple readers-writers. As such the fileFinder as it is configured now is part of the job. I have edit the question with an example tasklet that closely resembles my actual SQL creator tasklet. I hope this clarifies my question. Is it possible to do it like this? – Sander_M May 19 '16 at 09:43
  • I don't see the reason why you want to put the FileFinder in the job itself (as tasklet) but that is probably because I don't fully understand the problem you need to solve. When you build your job in your "BatchConfiguration" class, you should have all the information that is necessary to build the job completely. You should exactly know, how many steps you'll have, what the filenames for all the FileReaders are, how the SQLs für your JDBCItemWriter look, etc. Even if the job is constructed dynamically and depends, for instance, on the number of files present in a directory at runtime. – Hansjoerg Wingeier May 19 '16 at 11:01
  • The application generates SQL statements and after that it extracts data using these SQLs. My initial idea was to make that into one job that can be launched. I got it to work using the JobExecutionDecider, but unfortunately that just executes the reader-proc-writer in a loop and is not scalable for many SQLs. Perhaps the BatchConfiguration class should contain two individual jobs. First the generateSQL job and after that the extractData job. After the execution of the first job, all information is present to dynamically generate the second job. What is your opinion on that approach? – Sander_M May 19 '16 at 11:27
  • 1
    Why do you want to put the SQL statement generation into a job? You can do this, as suggested above, before you build your job. And then, when you have created/calculated your SQL statments, you can dynamically create your job and add a step for every calculated SQL, I mean, you are in the springcontext, all instantiated beans are ready and can be injected into your BatchConfiguration class. Are there any features inside a Job, that would make the SQL statement generation significantly easier than doing it in a pure spring bean? – Hansjoerg Wingeier May 19 '16 at 11:36
  • I put the SQL statement generation into the job as a tasklet so that I could launch the application as a whole as a spring boot app. I dont really know how to make that tasklet into a "normal" class (perhaps the pure spring bean that you refer to?) and use that class in the job without the class being a tasklet. So, to answer your last question: No, there are no features in the Job that make the SQL statement generation easier. Ideally I launch the spring boot app and the SQL generation class just gets executed first (constructed and methods executed) and after that the the rest r/p/w steps. – Sander_M May 19 '16 at 11:55
  • 1
    You do not have to start the application differently. In order to make your Tasklet a "normal" spring bean, just remove the 'Tasklet' after 'implements'. And then inject it with '@Autowired' in your BatchConfiguration-class. Depending on how your context is configured, you probably have to add an '@Import' annotation (see my edited example above). That's it. The spring context takes care that your SQL Generation class (formerly known as FileFinder-Tasklet) is executed first and that its PostConstruct method is called before it is injected into the BatchConfiguration class. – Hansjoerg Wingeier May 19 '16 at 12:07
  • Great answers, thanks. I will try and hack it together (later) and accept the answer(s). Really beautiful how the java config works in comparison to xml configuration. I am sure that after I get more familiar with it, the setup of the configuration will go much smoother and my current configuration struggles will look silly in hindsight. Thanks again @Hansjoerg! – Sander_M May 19 '16 at 12:26