0

Aim - Decrypt a .pgp encrypted file , read the data as a stream , perform Transformation to vendor requirement, encrypt as stream and write to a File.

Logic - Custom Reader, Writer and Tasklet , Stored the decrypted/encrypted data onto ExecutionContext and pass to different steps.

Works for - Small file (~1MB)

Issue faced - Tried with a (~10MB - 10K records) - Reading step was successful, but when begin writing data as an Encrypted File - Memory issue - java.lang.OutOfMemoryError: Java heap space

Code snippet -

<job id="testJob" xmlns="http://www.springframework.org/schema/batch">

    <!-- Read Encrypted file and Decrypt -->
    <batch:step id="decryptFile" next="readAndWriteData">
        <batch:tasklet ref="fileDecryptionTasklet">
            <batch:listeners>
                <batch:listener ref="decryptFileListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>

    <!-- Read data from decryption step and write to Stream -->
    <batch:step id="readAndWriteData" next="encryptFile">
        <batch:tasklet>
            <batch:chunk reader="hrdsCustomReader" processor="Processor"
                writer="CustomWriter" commit-interval="${.ftp.comit.interval}" />
            <batch:listeners>
                <batch:listener ref="encryptFileListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>

    <!-- Write to vendor specific file -->
    <batch:step id="encryptFile">
        <batch:tasklet ref="fileEncryptionTasklet" />
    </batch:step>

</job>

Tasklet and custom writers code snippets -


@Override
public String read() throws Exception, UnexpectedInputException,
        ParseException {

    decryptedData = (String) stepExecution.getJobExecution()
            .getExecutionContext().get("DecryptedData");
    if (decryptedData != null)
        //logger.info("decryptedData in Custom Reader - \n" + decryptedData);

    stepExecution.getJobExecution().getExecutionContext()
            .put("DecryptedData", null);
    return decryptedData;
}

 
public void write(List items) throws Exception {
    logger.info("Begin writing data as an Encrypted File");

      Iterator itr = items.iterator();
      while(itr.hasNext()) {
             String element =  itr.next();
             lineBuffer.append(element+LINE_SEPARATOR);
          }
      ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("EncryptedData", lineBuffer);
}


public RepeatStatus execute(StepContribution step, ChunkContext chunk)
        throws Exception {

    InputStream inputstream = new FileInputStream(inputdirectory);

    Message encryptMessage = MessageBuilder
            .withPayload(inputstream)
            .setHeader(
                    FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
                    "decryptAndVerify")
            .setHeader(
                    FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
                    EncryptionUtil.DECRYPT_STREAM_OPERATION)
            .setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER,
                    filename).build();

    InputStream inputStream = pgptransformer
            .doTransformStream(encryptMessage);
    String strData = IOUtils.toString(inputStream, "UTF-8");
    inputstream.close();


    chunk.getStepContext().getStepExecution().getExecutionContext().put("DecryptedData", strData);

    return null;
}


public RepeatStatus execute(StepContribution step, ChunkContext chunk)
        throws Exception {


        lineBuffer = (StringBuffer) chunk.getStepContext()
                .getJobExecutionContext().get("EncryptedData");
        byte[] bytes = lineBuffer.toString().getBytes();
        InputStream inputStream = new ByteArrayInputStream(bytes);

        Message encryptMessage = MessageBuilder
                .withPayload(inputStream)
                .setHeader(PGPFileTransformer.OUTPUT_FILE_FOLDER,
                        outputdirectory)
                .setHeader(
                        FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
                        "signAndEncrypt")
                .setHeader(
                        FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
                        EncryptionUtil.ENCRYPT_STREAM_OPERATION)
                .setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER,
                        filename).build();

        pgptransformer.doTransform(encryptMessage);
        inputStream.close();


    chunk.getStepContext().getStepExecution().getExecutionContext().put("EncryptedData", null);

    return null;
}

Appreciate if somebody can help resolve the issue.

Aacini
  • 65,180
  • 12
  • 72
  • 108
P.D
  • 141
  • 4
  • 14
  • You are storing the content in memory that is nice for small objects not for large objects. Don't store it in memory. You should create an InputStream that can decrypt/encrypt files that way encryption/decryption is handled transparently and you could use a flatfileitemreader/writer to process items. – M. Deinum Aug 28 '14 at 09:50
  • Hi Deinum, thanks for the reply. Due to the sensitivity of the data, i'm not supposed to write the decrypted data to a file. Hence i decrypted the data to an InputStream and converted to string to store in the ExecutionContext. Later i fetch the string in the next step - process and again add it to ExecutionContext as StringBuffer to write onto an encrypted file. – P.D Aug 28 '14 at 10:58
  • Where do I say that you need to store the decrypted data… You read encrypted data, process and write/encrypt again. You don't want to keep the whole file in memory. What is even worse you are keeping it about 4 to 5 times in memory. Read, byte[], StringBuffer, String… Those are all copies, increasing and increasing your memory usage. – M. Deinum Aug 28 '14 at 11:14
  • http://stackoverflow.com/questions/6827725/how-to-decrypt-a-signed-pgp-encrypted-file might be of some help. – M. Deinum Aug 28 '14 at 11:18
  • Thanks, the reason i used String/StringBuffer was because i could not directly put(Map) the InputStream with decrypted data to the Steps ExecutionContext. Also, just to clarify - how do i pass the InputStream to the next step in SpringBatch? – P.D Aug 28 '14 at 13:40
  • 1
    You don't pass the `InputStream` your item reader needs to treat the InputStream just as any other input stream read a row(or rows depending on what is in the file), pass that the the processor which hands it off to the writer. You basically end up with 1 step instead of 3. – M. Deinum Aug 28 '14 at 18:56
  • Thanks Denium, for the tip. Have achieved the task with just 1 step. However, tweaked the encryption decryption logic to increase performance. – P.D Sep 02 '14 at 07:04

1 Answers1

0

was able to decrypt and process 100K records in <2mins.

Logic - Process in chunks - 200 records into 1 line.

Posting the code below -

Batch Config-

<job id="aic-batch-xxx-ftp" xmlns="http://www.springframework.org/schema/batch">


    <!-- Read data , decrypt , process and write to encrypted file -->
    <batch:step id="readAndWriteData">
        <batch:tasklet>
            <batch:chunk reader="xxxCustomReader" processor="xxxFileProccessor"
                writer="xxxCustomWriter" commit-interval="${aic.batch.xxx.ftp.comit.interval}" />
        </batch:tasklet>
    </batch:step>


</job>

Reader Logic -

StringBuffer decryptedData = new StringBuffer();
    String strLine = "";

    PGPLib pgp = new PGPLib();
    KeyStore keyStore = new KeyStore("xxx.keystore", "xxx");

    long startTime = System.currentTimeMillis();
    // Read & decrypt File Line By Line
    if ((strLine = bufferedReader.readLine()) != null) {
        strLine = strLine.replace("NEW_LINE", "\r\n");
        decryptedData.append((pgp.decryptString(strLine, keyStore,
                "xxx")));
        long endTime = System.currentTimeMillis();
        logger.debug("Total time taken = " + (endTime - startTime) + " msec");
        return decryptedData;
    }
    else
        return null;

Writer Logic -

public void write(List<? extends StringBuffer> items) throws Exception {
    logger.debug("Begin writing data as an Encrypted File");

    @SuppressWarnings("unchecked")
    Iterator<StringBuffer> itr = (Iterator<StringBuffer>) items.iterator();
    while (itr.hasNext()) {
        StringBuffer element = itr.next();
        encrypt(element);
        count++;
    }
}

public void encrypt(StringBuffer element) throws PGPException, IOException {

    PGPLib pgp = new PGPLib();
    KeyStore keyStore = new KeyStore("xxx.keystore", "xxx");

    String strLine = element.toString();
    StringBuffer buffer = new StringBuffer("");
    int i = 0;
    long startTime = System.currentTimeMillis();

    if (i % 200 == 0) {
        if (i != 0) {
            String encryptString = pgp.encryptString(buffer.toString(),
                    keyStore,
                    "xxx");
            encryptString = encryptString.replace("\r\n", "NEW_LINE");
            bufferedWriter.write(encryptString);
            bufferedWriter.newLine();
        }
        buffer = new StringBuffer(strLine);
    } else {
        buffer.append("\r\n").append(strLine);
    }
    i++;
    if (buffer != null && buffer.length() > 0) {
        String encryptString = pgp.encryptString(buffer.toString(),
                keyStore, "xxx");
        encryptString = encryptString.replace("\r\n", "NEW_LINE");
        bufferedWriter.write(encryptString);
        bufferedWriter.newLine();
    }

    long endTime = System.currentTimeMillis();
    logger.debug("Total time taken = " + (endTime - startTime) + " msec");

}
P.D
  • 141
  • 4
  • 14
  • 1
    Looks like there is still room for improvement. Looks like you have soem duplication in the `encrypt` method and you are recreating the `PGPLib` and `KeyStore` objects each time you need them (for both the reader and writer) maybe you can create a step scoped dependeny out of that?. I would also suspect that the `i % 200` should be tied to the commit interval some how. One quick win, IMHO, is to use a `StringBuilder` instead of a `StringBuffer`. The latter is synchronized the first isn't. – M. Deinum Sep 02 '14 at 07:27
  • Just a thought, wouldn't it be possible to create a reader which just reads the file line by line (encrypted) which passes it to a chain of processors which decrypt-proces-encrypt (or in a single processor). That way your reader only reads and your writer only needs to write. You would limit the amount of objects created/passed along (especially the PGP stuff) which should be easier for the GC. – M. Deinum Sep 02 '14 at 07:30
  • The challenge i faced was -- with normal encryption it encrypts the entire file as a whole, so i cannot read line by line, its all unreadable characters. So encrypted 200/n records into 1 line and replace \r\n with NEW_LINE, that way can identify 1 chunk(200 or N records). **The commit interval is 1, the reader reads 1 encrypted row at a time, process and write to file. – P.D Sep 02 '14 at 07:43
  • Ah thats a bummer. However that figures as the whole file is encrypted, hence my suggestion of being able to create a `InputStream` and `OutputStream` that handles that transparantly. I cannot imagine that there isn't something like that around. It would increase performance and simplify your code greatly. – M. Deinum Sep 02 '14 at 07:50