1

My goal is to move a pdf and a xml file from an input to an output folder when both files are available in the input folder. If one file is not available it should retry a couple of times and then move the file to an .error folder.

I noticed that the File component's error handling seems to be affected by the aggregator and I don't know how exactly:

When throwing an exception after the aggregation, the File component moves the file to the .done folder instead of the .error folder, see (2) in code. However, when throwing the exception before the aggregation it works as expected, see (1).

My questions are:

  1. What is missing to make (2) work?
  2. Is it possible to achieve the same without throwing an exception? There's the discardOnCompletionTimeout option but this doesn't work for me as well.
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.commons.io.FilenameUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;

public class SimpleMockTest extends CamelTestSupport {
    public static final int COMPLETION_TIMEOUT = 1000;

    @TempDir
    Path tempDir;

    @Test
    public void exceptionAfterAggregation() throws Exception {
        final MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.setResultWaitTime(COMPLETION_TIMEOUT * 3);
        mockEndpoint.expectedFileExists(tempDir.resolve(".done/willMoveToDone.xml"));
        mockEndpoint.expectedFileExists(tempDir.resolve(".done/willMoveToDone.pdf"));
        mockEndpoint.expectedFileExists(tempDir.resolve(".error/shouldMoveToError.xml")); // this fails

        createFile("willMoveToDone.xml", "<xml>1</xml>");
        createFile("willMoveToDone.pdf", "pdfFileContent");
        createFile("shouldMoveToError.xml", "<xml>2</xml>");

        mockEndpoint.assertIsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() {
                from("file://" + tempDir + "?include=.*\\.(xml|pdf)&moveFailed=.error&move=.done")
//                        .process(exchange -> {
//                            throw new RuntimeException("Files would be moved to .error as expected");     // (1)
//                        })
                        .aggregate(CORRELATION_EXPRESSION, AggregationStrategies.flexible().accumulateInCollection(ArrayList.class))
                        .completionTimeout(COMPLETION_TIMEOUT)
                        .completionSize(2)

                        .choice()
                            .when(simple("${body.size()} != 2"))
                                .throwException(RuntimeException.class, "Why doesn't the File component move this file to .error?")     // (2)
                        .otherwise()
                            .to("mock:result")
                        .routeId("fileAggregationRoute");
            }

        };
    }

    private final Expression CORRELATION_EXPRESSION = new Expression() {
        @Override
        public <T> T evaluate(Exchange exchange, Class<T> type) {
            final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
            final String correlationExpression = FilenameUtils.getBaseName(fileName);
            return exchange.getContext().getTypeConverter().convertTo(
                    type,
                    correlationExpression
            );
        }
    };

    private void createFile(String filename, String content) throws IOException {
        final File xmlFile = tempDir.resolve(filename).toFile();
        Files.write(xmlFile.toPath(), content.getBytes(StandardCharsets.UTF_8));
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.mycompany.app</groupId>
  <artifactId>camel-playground</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>my-app</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-bom</artifactId>
        <version>3.11.3</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-test-junit5</artifactId>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.11.0</version>
    </dependency>
  </dependencies>
</project>
dev-random
  • 741
  • 1
  • 7
  • 13
  • 1
    Since you seem to want to process a pair of files you could also check out [poll enrich](https://camel.apache.org/components/latest/eips/pollEnrich-eip.html) pattern to get **matching** xml file for your pdf or vice versa. For my understanding aggregation strategy using count with consumer endpoint accepting both pdf and xml files would trigger even with 2 pdf files or 2 xml files which might not be what you want. – Pasi Österman Oct 25 '21 at 10:12
  • Hi @PasiÖsterman, your answer is slightly off-topic but nonetheless interesting. So you would recommend something like: https://gist.github.com/zregvart/982ba400978349c1dd60412484d91038 ? This question is specifically about why the File component doesn't move the file to the .error folder when an exception is thrown _after_ the aggregation although this works when an exception is thrown _before_ the aggregation. – dev-random Oct 27 '21 at 19:26
  • 1
    That's why it's a comment not an answer, as for your link yeah something like that but you can keep both files separate ifyou wish by for example adding them both to a list, array or object. As for aggregate problem it probably cuts the connection to the original consumer endpoint as it's kinda "consumer endpoint" of it's own creating and eventually consuming aggregated message based on the strategy given. Don't know if there's option for it to support your use case. – Pasi Österman Oct 29 '21 at 06:42

0 Answers0