I am using WatchServiceDirectoryScanner/RecursiveLeafOnlyDirectoryScanner
to process files in File System. Files events are generated and Message are received at defined EndPoint, but sometimes it miss to process all files in that directory.
For example if there are 15 files, sometime it process 10 files and sometimes 5 files where metastore has information about all 15 files in "metadata-store.properties"
.
spring-integration-configuration.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<int:annotation-config />
<int:channel id="cfpFileIn"></int:channel>
<int-file:inbound-channel-adapter id="cfpFileIn"
directory="${cfp.flight.data.dir}" auto-startup="true" scanner="csvDirScanner">
<int:poller fixed-delay="${cfp.flight.data.dir.polling.delay}"></int:poller>
</int-file:inbound-channel-adapter>
<bean id="csvDirScanner"
class="org.springframework.integration.file.WatchServiceDirectoryScanner">
<constructor-arg index="0" value="${cfp.flight.data.dir}" />
<property name="filter" ref="csvCompositeFilter" />
<property name="autoStartup" value="true" />
</bean>
<bean id="csvCompositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean
class="org.springframework.integration.file.filters.SimplePatternFileListFilter">
<constructor-arg value="*.csv" />
</bean>
<ref bean="persistentFilter" />
</list>
</constructor-arg>
</bean>
<bean id="persistentFilter"
class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStore" />
<constructor-arg index="1" name="prefix" value="" />
<property name="flushOnUpdate" value="true" />
</bean>
<bean name="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore">
<property name="baseDirectory" value="${metadata.dir}"></property>
</bean>
</beans>
BatchJobScheduler:
import java.io.File;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Component
public class BatchJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(BatchJobScheduler.class);
@Autowired
protected JobLauncher jobLauncher;
@Autowired
@Qualifier(value = "job")
private Job job;
@ServiceActivator(inputChannel = "cfpFileIn")
public void run(File file) {
String fileName = file.getAbsolutePath();
logger.info("BatchJobScheduler Running #################"+fileName);
JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", fileName).toJobParameters();
try {
JobExecution execution = jobLauncher.run(job,
jobParameters);
logger.info(" BatchJobScheduler Exit Status : " + execution.getStatus() +"::"+execution.getAllFailureExceptions());
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
logger.error(" BatchJobScheduler Exit Status : Exception ::",e);
}
}
}
Seems I am missing some configuration or code.
Based on suggestions:
I have placed complete spring-integration.xml
and log file on git repo - https://github.com/chandaku/spring-integration-issue.
5 channels are created here and working simultaneously I think when I run only one channel at a time, it successfully process all files in directory, but if I will open all file channel it has problem.