4

This question might seem to be a duplicate of this but it is not

My requirement is to read data from db using JdbcPagingItemReader and process individual records for some additional processing and in writer create individual json files for each processed item with file name id_of_record_json_fie.txt

For example if reader reads 100 records then 100 JSON files has to be created

What is the best way to do this, Can we use spring batch for this ?

Update 1-:

As per @Mahmoud answer, tasklet can be used , I have also tried implementing custom itemwriter in a chunk oriented step , this also seems to work

      @Override
        public void write(final List<? extends Person> persons) throws Exception {
            
            for (Person  person: persons) {
                objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
            }
            
        }
ravicandy1234
  • 156
  • 1
  • 2
  • 13
  • Yes you can. SQL Reader then a processor that creates the JSON and a writer that writes the json file – Simon Martinelli Jul 23 '20 at 09:09
  • @SimonMartinelli: I need some help /guidance on how to generate individual JSON file with different names , am aware that spring batch writer writes entire chunk to single file – ravicandy1234 Jul 23 '20 at 09:33

1 Answers1

4

Using a chunk-oriented tasklet won't work, because there will be a single item writer on which the resource is set upfront and will be fixed during the entire step. Using a composite item writer might work but you need to know how many distinct writers to create and configure upfront.

The most straightforward option I see is to use a tasklet, something like:

import java.util.Collections;
import java.util.HashMap;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public JdbcPagingItemReader<Person> itemReader() {
        return new JdbcPagingItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .beanRowMapper(Person.class)
                .selectClause("select *")
                .fromClause("from person")
                .sortKeys(new HashMap<String, Order>() {{ put("id", Order.DESCENDING);}})
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .tasklet(new MyTasklet(itemReader()))
                        .build())
                .build();
    }
    
    private static class MyTasklet implements Tasklet {

        private boolean readerInitialized;
        private JdbcPagingItemReader<Person> itemReader;

        public MyTasklet(JdbcPagingItemReader<Person> itemReader) {
            this.itemReader = itemReader;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
            if (!readerInitialized) {
                itemReader.open(executionContext);
                readerInitialized = true;
            }
            Person person = itemReader.read();
            if (person == null) {
                itemReader.close();
                return RepeatStatus.FINISHED;
            }
            // process the item
            process(person);
            // write the item in its own file (dynamically generated at runtime)
            write(person, executionContext);
            // save current state in execution context: in case of restart after failure, the job would resume where it left off.
            itemReader.update(executionContext);
            return RepeatStatus.CONTINUABLE;
        }

        private void process(Person person) {
            // do something with the item
        }
        
        private void write(Person person, ExecutionContext executionContext) throws Exception {
            FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
                    .resource(new FileSystemResource("person" + person.getId() + ".csv"))
                    .name("personItemWriter")
                    .delimited()
                    .names("id", "name")
                    .build();
            itemWriter.open(executionContext);
            itemWriter.write(Collections.singletonList(person));
            itemWriter.close();
        }
        
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
        jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
        for (int i = 1; i <= 10; i++) {
            jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
        }
        return embeddedDatabase;
    }

    static class Person {
        private int id;
        private String name;

        public Person() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String toString() {
            return "Person{id=" + id + ", name='" + name + '\'' + '}';
        }
    }

}

This example reads 10 persons from a db table and generates 10 csv files (person1.csv, person2.csv, etc)

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • Thanks ,really appreciate your efforts :) , but do you think using spring batch is ideal solution for this ? If query returns lot of data will it effect performance also I assume we cannot use any of spring scaling options here ? If am using composite writer in chunk step how can we pass file name dynamically ? – ravicandy1234 Jul 23 '20 at 13:54
  • Yes, I think Spring Batch is a good option. What you get immediately is that in case of failure, the job will restart from where it left off (you would need to add `itemReader.update(executionContext);` after processing/writing an item in my previous example). For the composite writer approach, you can pass the file name dynamically to the the composite writer, but only at configuration time, not at runtime. You can't change a step's writer after starting it. So if you want the composite writer approach, you need to pre-calculate how many writers upfront and configure them in the composite. – Mahmoud Ben Hassine Jul 23 '20 at 14:00
  • In your example, you would need to determine upfront that you have 100 distinct items (ie a 100 files to create), create 100 item writers instances and configure them in a composite item writer that you set on the step.. – Mahmoud Ben Hassine Jul 23 '20 at 14:01
  • You can still use scaling options: 1) multi-threaded step: set `.taskExecutor(yourTaskExecutor)` on the tasklet and make sure everything is thread-safe 2) Use paritioning: give each tasklet a different partition and make them work in parallel. – Mahmoud Ben Hassine Jul 23 '20 at 14:08
  • composite writer approach doesn't seem right to me because if we have 1 million records , we need to pre compute it and creating those many object will kill performance, – ravicandy1234 Jul 23 '20 at 14:09
  • That's why I think the most straightforward option for your case is to use a tasklet. I updated the answer with `itemReader.update(executionContext);` where it should be added. If the answer helped, please accept it. Thank you. – Mahmoud Ben Hassine Jul 23 '20 at 14:12
  • thank you :) I think the above code should move to spring samples :) – ravicandy1234 Jul 23 '20 at 14:14
  • Hi Mahmoud, cant we use a custom writer in this case ? – ravicandy1234 Jul 28 '20 at 10:28