5

I've CSV file which contains millions of records and the size will be around 2GB. My use case here is to read the CSV file from S3 and process it. Please find my code below:

In the below code, I'm reading a file from S3 bucket and using the inputStream directly in the Spring batch FlatFileItemReader reader.setResource(new InputStreamResource(inputStream));

As per this implementation, I'm holding 2GB of content in memory and processing it which is not an efficient way of doing it - can someone please suggest what is the efficient way of reading a large file from the S3 bucket and processing it in the Spring batch.

Appreciated your help in advance! Thanks.

@Component
public class GetFileFromS3 {

    public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region) {
        try {
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
                    .withRegion(region).build();

            S3Object s3object = s3Client.getObject(bucketName, keyName);
            return s3object.getObjectContent();
        } catch (AmazonServiceException e) {
            e.printStackTrace();
        }
        return null;
    }

}




public class SpringBatch {

    @Autowired
    private GetFileFromS3 getFileFromS3;


 @Bean(name = "csvFile")
    public Step step1() {
        return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
        reader.setResource(new InputStreamResource(inputStream));
        reader.setLinesToSkip(1);
        reader.setLineMapper(new DefaultLineMapper() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(Employee.fields());
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                    {
                        setTargetType(Employee.class);
                    }
                });
            }
        });
        return reader;
    }

    @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new ItemProcessor();
    }

    @Bean
    public ItemWriter<Employee> writer() {
        return new ItemWriter<Event>();
    }

    }
Mike Marsh
  • 387
  • 3
  • 15

1 Answers1

0

Making use of the ResourceLoader, we could read files in S3 in ItemReader as like other resource. This would help to read files in S3 in chunks instead of loading entire file into memory.

With the dependencies injected for ResourceLoader and AmazonS3 client, have changed reader configuration as below:

Replace values for sourceBucket and sourceObjectPrefix as needed.

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private AmazonS3 amazonS3Client;

// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader() {
    SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
    List<Resource> resourceList = new ArrayList<>();
    String sourceBucket = yourBucketName;
    String sourceObjectPrefix = yourSourceObjectPrefix;
    log.info("sourceObjectPrefix::"+sourceObjectPrefix);
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
            .withBucketName(sourceBucket)
            .withPrefix(sourceObjectPrefix);
    ObjectListing sourceObjectsListing;
    do{
        sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
        for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries()){

            if(!(sourceFile.getSize() > 0)
                    || (!sourceFile.getKey().endsWith(DOT.concat("csv")))
            ){
                // Skip if file is empty (or) file extension is not "csv"
                continue;
            }
            log.info("Reading "+sourceFile.getKey());
            resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
                    .concat(sourceFile.getKey())));
        }
        listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
    }while(sourceObjectsListing.isTruncated());

    Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
    MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setName("employee-multiResource-Reader");
    multiResourceItemReader.setResources(resources);
    multiResourceItemReader.setDelegate(employeeFileItemReader());
    synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
    return synchronizedItemStreamReader;
}

@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()
{
    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(Employee.fields());
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                {
                    setTargetType(Employee.class);
                }
            });
        }
    });
    return reader;
}

Have used MultiResourceItemReader as example. This would work even if there are multiple CSV files in the particular S3 path you are looking for.

For processing only one CSV file in a location it would work implicitly as well with Resources[] resources containing one entry.

Vignesh
  • 364
  • 2
  • 11
  • 19