0

I am testing my integration flow with 2 different scenarios: reading an XML file and try to validate it, so in case it's a valid XML then it should be moved to processed Dir, otherwise to Error Dir.

this is the main Integration flow:

package com.stackoverflow.questions.config;


import static java.util.Arrays.asList;

import com.stackoverflow.questions.dto.WriteResult;
import com.stackoverflow.questions.handler.FileReaderHandler;
import com.stackoverflow.questions.handler.StudentErrorHandler;
import com.stackoverflow.questions.handler.StudentWriterHandler;
import com.stackoverflow.questions.service.DirectoryManagerService;
import com.stackoverflow.questions.transformer.FileToStudentTransformer;
import java.io.File;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.DirectoryScanner;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.RecursiveDirectoryScanner;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
@RequiredArgsConstructor
public class MainIntegrationFlow {

  @Value("${regex.filename.pattern}")
  private String regexFileNamePattern;

  @Value("${root.file.dir}")
  private String rootFileDir;

  @Value("${default.polling.rate}")
  private Long defaultPollingRate;

  private final DirectoryManagerService directoryManagerService;
  private final StudentErrorHandler studentErrorHandler;
  private final FileReaderHandler fileReaderHandler;
  private final StudentWriterHandler studentWriterHandler;
  private final FileToStudentTransformer fileToStudentTransformer;

  @Bean("mainStudentIntegrationFlow")
  public IntegrationFlow mainStudentIntegrationFlow(
      @Qualifier("mainFileReadingSourceMessage") MessageSource<File> mainFileReadingSourceMessage,
      @Qualifier("fileReaderChannel") MessageChannel fileReaderChannel) {
    return IntegrationFlows.from(mainFileReadingSourceMessage)
        .handle(fileReaderHandler)
        .transform(fileToStudentTransformer)
        .handle(studentWriterHandler)
        .<WriteResult, Boolean>route(WriteResult::isWriten,
            mapping -> mapping
                .subFlowMapping(true, moveToProcessedDirFlow())
                .subFlowMapping(false, moveToErrorDirFlow()))
        .get();
  }


  public IntegrationFlow moveToProcessedDirFlow() {
    return flow -> flow.handle(message ->
        directoryManagerService
            .moveToProcessedDir(((WriteResult) message.getPayload()).getFilename()));
  }

  public IntegrationFlow moveToErrorDirFlow() {
    return flow -> flow.channel("studentErrorChannel")
        .handle(message ->
            directoryManagerService
                .moveToErrorDir(((WriteResult) message.getPayload()).getFilename()));
  }

  @Bean(name = "errorHandlerMainFlow")
  public IntegrationFlow errorHandlerMainFlow() {
    return IntegrationFlows.from("errorChannel")
        .handle(studentErrorHandler)
        .get();
  }

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata mainPollerMetadata() {
    return Pollers.fixedRate(defaultPollingRate, TimeUnit.SECONDS)
        .maxMessagesPerPoll(0)
        .get();
  }

  @Bean(name = "fileReaderChannel")
  public MessageChannel fileReaderChannel() {
    return MessageChannels.direct().get();
  }

  @Bean("mainDirectoryScanner")
  public DirectoryScanner mainDirectoryScanner() {
    DirectoryScanner recursiveDirectoryScanner = new RecursiveDirectoryScanner();

    CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<>(
        asList(new AcceptOnceFileListFilter<>(),
            new RegexPatternFileListFilter(regexFileNamePattern)));

    recursiveDirectoryScanner.setFilter(compositeFileListFilter);
    return recursiveDirectoryScanner;
  }

  @Bean("mainFileReadingSourceMessage")
  public MessageSource<File> mainFileReadingSourceMessage(
      @Qualifier("mainDirectoryScanner") DirectoryScanner mainDirectoryScanner) {
    FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
    fileReadingMessageSource.setDirectory(new File(rootFileDir));
    fileReadingMessageSource.setScanner(mainDirectoryScanner);

    return fileReadingMessageSource;
  }
}

but when running the tests, the first one succeeds but the second fails - and I can see from the logs that the message does not go through the integration flow for the second test:

package com.stackoverflow.questions;

import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

import com.stackoverflow.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;

@SpringBootTest
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
public class MainFlowIntegrationTests {


  private static final String MOCK_FILE_DIR = "intFiles/";
  private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
  private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";

  @Autowired
  private MessageChannel fileReaderChannel;

  @Autowired
  private DirectoryManagerService directoryManagerService;

  private File queueDir;
  private File processed;
  private File error;

  @BeforeEach
  public void setup() throws IOException {
    createRequiredDirectories();
    moveFilesToQueueDir();
    injectProperties();
  }

  @AfterEach
  public void tearDown() throws IOException {
    deleteRequiredDirectories();
  }

  @Test
  public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a valid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());

    // Then: the valid XML file should be sent to the processedDir
    await().until(() -> processed.list().length == 1);
  }

  @Test
  public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a invalid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());

    // Then: the invalid XML file should be sent to the errorDir
    await().until(() -> error.list().length == 1);
  }

  private void injectProperties() {
    ReflectionTestUtils
        .setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
    ReflectionTestUtils
        .setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
  }

  private void moveFilesToQueueDir() throws IOException {
    File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();

    for (String filename : intFiles.list()) {
      FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
    }
  }

  private void createRequiredDirectories() throws IOException {
    queueDir = Files.createTempDirectory("queueDir").toFile();
    processed = Files.createTempDirectory("processedDir").toFile();
    error = Files.createTempDirectory("errorDir").toFile();
  }

  private void deleteRequiredDirectories() throws IOException {
    forceDelete(queueDir);
    forceDelete(processed);
    forceDelete(error);
  }

}

StackTrace:



org.awaitility.core.ConditionTimeoutException: Condition with lambda expression in com.stackoverflow.questions.MainFlowIntegrationTests was not fulfilled within 10 seconds.

    at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
    at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
    at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
    at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
    at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:864)
    at com.stackoverflow.questions.MainFlowIntegrationTests.readingFileAndMoveItToProcessedDir(MainFlowIntegrationTests.java:61)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


The second test fails, unless I reload the application context after each test - using @DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD) or autowiring my integration as a StandardIntegrationFlow and start/stop it manually as below:

package com.stackoverflow.questions;

import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

import com.stackoverflow.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;

@SpringBootTest
public class MainFlowIntegrationTests {


  private static final String MOCK_FILE_DIR = "intFiles/";
  private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
  private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";

  @Autowired
  private MessageChannel fileReaderChannel;

  @Autowired
  private StandardIntegrationFlow mainStudentIntegrationFlow;

  @Autowired
  private DirectoryManagerService directoryManagerService;

  private File queueDir;
  private File processed;
  private File error;

  @BeforeEach
  public void setup() throws IOException {
    createRequiredDirectories();
    moveFilesToQueueDir();
    injectProperties();
    mainStudentIntegrationFlow.start();
  }

  @AfterEach
  public void tearDown() throws IOException {
    mainStudentIntegrationFlow.stop();
    deleteRequiredDirectories();
  }

  @Test
  public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a valid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());

    // Then: the valid XML file should be sent to the processedDir
    await().until(() -> processed.list().length == 1);
  }

  @Test
  public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a invalid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());

    // Then: the invalid XML file should be sent to the errorDir
    await().until(() -> error.list().length == 1);
  }

  private void injectProperties() {
    ReflectionTestUtils
        .setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
    ReflectionTestUtils
        .setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
  }

  private void copyFilesToQueueDir() throws IOException {
    File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();

    for (String filename : intFiles.list()) {
      FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
    }
  }

  private void createRequiredDirectories() throws IOException {
    queueDir = Files.createTempDirectory("queueDir").toFile();
    processed = Files.createTempDirectory("processedDir").toFile();
    error = Files.createTempDirectory("errorDir").toFile();
  }

  private void deleteRequiredDirectories() throws IOException {
    forceDelete(queueDir);
    forceDelete(processed);
    forceDelete(error);
  }

}

Moaad
  • 13
  • 4
  • We need to understand what your flow does. It fully not clear from the test code what is your logic over there. Consider to share with us the flow you are testing. – Artem Bilan Sep 20 '20 at 14:33
  • sorry for the mistake @ArtemBilan, you can check the flow now – Moaad Sep 21 '20 at 15:10
  • I'm going to ask you questions for your flow. Let's see if we can fix it after answers! So, do you consume a `studentErrorChannel` somehow else, not only in that `moveToErrorDirFlow`? In fact: do you need that channel at all? Doesn't look like your `moveToProcessedDirFlow` has some interim channels... – Artem Bilan Sep 21 '20 at 15:17
  • ` MessageChannels.queue("fileReaderChannel")` - why `queue`? According your logic you don't need a queue in between. – Artem Bilan Sep 21 '20 at 15:18
  • You can turn on DEBUG login level for the `org.springframework.integration` to see in logs how your messages are traveling. – Artem Bilan Sep 21 '20 at 15:22
  • Why do you need that `moveFilesToQueueDir()` logic in the test? – Artem Bilan Sep 21 '20 at 15:29
  • I will try to answer your questions in the following points: yes, I am using ```studentErrorChannel``` in ```StudentErrorHandler``` which is a message handler which get the exception from errorChannel and send it to the ```studentErrorChannel``` – Moaad Sep 23 '20 at 09:29
  • this queue will be removed - no need to have it in between – Moaad Sep 23 '20 at 09:30
  • in tests, I am copying(the name is misleading, should be copyFilesToQueueDir) the files to a queue dir, so that when the flow moves the files to error/processed dir, I still have the original files in resources dir. – Moaad Sep 23 '20 at 09:31
  • OK. so, please, try to extract debug logs to understand how your message travel. Probably it would be great to provide the whole JUnit report when it fails. So, we would see which test fails. – Artem Bilan Sep 23 '20 at 14:41

0 Answers0