0

Hi I have a use case where I should read data from DB and try to combine rows. For example I have one order and many orderDetails. The output of my SQL Query is orderDetails. OrderNum is FK in orderDetails. So, now when I process orderDetails I have to filter by orderNum and consume an external end-point. In my case I am using RepositoryItemReader

MyReader.java

public RepositoryItemReader<List<POJO>> readCurrentSeasonOrder() {
// some code
}

MyProcessor.java

public POJO2 process(List<POJO> items) throws Exception {
// some code 
}

when I run my batch job I am getting the exception

POJO cannot be cast to java.util.List

I am unable to find any examples that fits my use case. Any help is very much appreciated.

Phanee M
  • 15
  • 4
  • Spring-Batch `ItemProcessor` processes per-item, not the entire list gathered from reader (the `ItemWriter` on the contrary, writes a list of items). So signature must be `public POJO2 process(POJO item)`. See also the [javadoc](https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/ItemProcessor.html#process-I-) – Michail Alexakis May 14 '22 at 11:48
  • @MichailAlexakis Do you have any suggestions on how I can achieve my use case? – Phanee M May 14 '22 at 14:17

2 Answers2

1

A "List of objects" is not a good encapsulation of an item in the chunk-oriented processing model of Spring Batch.

Option 1: You can change your query to join order and orderDetails and return items of type Order. This way, an item would be an Order.

Option2: Another way to do that is by using the driving query pattern. The idea is to make your reader return Orders, and use a processor to enrich orders with their details. This works for small/medium data sets, but performs poorly for large data sets (because of the additional query per item). This is related to the algorithm itself, not to Spring Batch per se.

You would prefer option 1 in which the database does the join in an efficient way and gives you back order items pre-filled with their details.

EDIT: Add sample for the driving query pattern

The following sample shows the idea of the driving query pattern. The reader returns items of type Person. The processor enriches person items with their addresses. Look how the reader fetches only the id and name of the person, but not the address. You can adapt it to Order and OrderDetails in your case.

/*
 * Copyright 2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.batch.sample;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

/**
 * @author Mahmoud Ben Hassine
 */
@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public JdbcCursorItemReader<Person> itemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person")
                .beanRowMapper(Person.class)
                .build();
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return new ItemProcessor<Person, Person>() {
            @Autowired
            private JdbcTemplate jdbcTemplate;

            @Override
            public Person process(Person person) {
                Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
                person.setAddress(address);
                return person;
            }
        };
    }

    @Bean
    public ItemWriter<Person> itemWriter() {
        return items -> {
            for (Person item : items) {
                System.out.println("item = " + item);
            }
        };
    }
    
    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Person, Person>chunk(2)
                        .reader(itemReader())
                        .processor(itemProcessor())
                        .writer(itemWriter())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
        jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
        jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
        jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
        jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
        jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    public static class Person {
        private long id;
        private String name;
        private Address address;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Address getAddress() {
            return address;
        }

        public void setAddress(Address address) {
            this.address = address;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", address=" + address +
                    '}';
        }
    }

    public static class Address {
        private int id;
        private int personId;
        private String street;

        public Address() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getStreet() {
            return street;
        }

        public void setStreet(String street) {
            this.street = street;
        }

        public int getPersonId() {
            return personId;
        }

        public void setPersonId(int personId) {
            this.personId = personId;
        }

        @Override
        public String toString() {
            return "Address{" +
                    "id=" + id +
                    ", street='" + street + '\'' +
                    '}';
        }
    }

}
Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • I cannot use Option1 because in the ItemProcessor I will have to info from orderDetails and Option2 will cause performance problem because I have large data. Can I do this instead? Step 1: read orderDetails from db, process (practically do nothing), writer - get unique orderNums from List and create my target object and add to a list, store List into step context. Step 2: read from context.........., write to DB. Is this a good approach? I am currently stuck where I have to read from context (which gives instance of type object) and convert to List – Phanee M May 14 '22 at 22:38
  • Can you also suggest a good example for implementing driving query pattern – Phanee M May 14 '22 at 22:48
  • In option 1 you don't need an item processor, items are returned by the reader from the database with their details already. In what you suggest you have: `process (practically do nothing)`, and `store List into step context`: this is not a good idea because the execution context is persisted between steps, and for large data sets as you have this will be a performance killer. I will add a sample for the driving query pattern. – Mahmoud Ben Hassine May 16 '22 at 09:49
  • Got you. It makes sense now. I think implementing the driving query pattern is the way to go. :) – Phanee M May 16 '22 at 12:36
1

based on the concepts and principles of Spring Batch, it is generally recommended to encapsulate each item in a domain-specific object rather than using a generic list of objects.

but in any case, if you want to do this I have written an exmple of it:

first, you should create a custom item reader :

public class CustomItemReader implements ItemReader<List<MyData>>, StepExecutionListener {
@PersistenceContext
private EntityManager entityManager;
private final int pageSize = 100;
private Long totalRecords;
private int currentPage = 0;

@BeforeStep
public void beforeStep(StepExecution stepExecution) {

    Query countQuery = entityManager.createQuery("SELECT COUNT(md) FROM MyData md WHERE  md.status = 'Active'");
    totalRecords = (Long) countQuery.getSingleResult();
}

@Override
public ExitStatus afterStep(@NotNull StepExecution stepExecution) {
    return null;
}

@Override
@Transactional(readOnly = true)
public List<MyData> read() {
    if (totalRecords == null || totalRecords == 0 || currentPage >= Math.ceil(1.0 * totalRecords / pageSize)) {
        return null;
    }
    TypedQuery<MyData> dataQuery = entityManager.createQuery("SELECT md FROM MyData md WHERE md.status = 'Active'", MyData.class);
    dataQuery.setFirstResult(currentPage * pageSize);
    dataQuery.setMaxResults(pageSize);
    List<MyData> listOfMyData = dataQuery.getResultList();
    currentPage++;
    return listOfMyData;
}}

then you can write your item processor:

public class MyItemProcessor implements ItemProcessor<List<MyData>, List<MyData>> {// do your logic}

the return type can be anything else but if you return List you should take List<List> as input in the item writer.

note: your chunk size is defined in the item reader in this example: private final int pageSize = 100 and you need to set chunk in your job config to 1 so that every time you get 1 list of data that have 100 items in it

mahdi
  • 36
  • 4