I'm currently using Spring Integration 4.1.0 with Spring 4.1.2. I have a requirement to be able to read a file line-by-line and use each line read as a message. Basically I want to allow "replay" for one of our message sources but messages are not saved in individual files but rather in a single file. I have no transaction requirements for this use-case. My requirements are similar to this posting except on a file residing on the same server as the one that the JVM is running on: spring integration - read a remote file line by line
As I see it I have the following options:
1. Use int-file:inbound-channel-adapter
to read the file then "split" that file so that 1 message now becomes multiple messages.
Sample config file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
The problem is that the file is very large and when using the above technique the entire file is first read into memory and is then split and the JVM runs out of heap space. Really the steps required are: read a line and convert line to message, send message, remove message from memory, repeat.
Use
int-file:tail-inbound-channel-adapter
withend="false"
(which basically indicates to read from the start of the file). Start and stop this adapter as needed for each file (changing the filename before each start). Sample config file:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:\p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
Have Spring Integration call into Spring Batch and use an
ItemReader
to process the file. Certainly allows more fine-grained controls over the whole process but a fair amount of work to setup what with the job repository and such (and I don't care about the job history so I'd either tell the job to not log status and/or or use the in-memoryMapJobRepository
).
4. Create my own FileLineByLineInboundChannelAdapter
by extending MessageProducerSupport
.
Much of the code can be borrowed from ApacheCommonsFileTailingMessageProducer
(also see http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). Below is a sample but needs some work to put the reading into it's own Thread
so that I honor the stop()
command while I read line-by-line.
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
It doesn't seem to me that my use-case is outside the realm of typical things people might like to do so I'm surprised that I can't find a solution to it out-of-the-box. I've searched quite a bit however and looked at a lot of the examples and unfortunately have yet to find something that suites my needs.
I'm assuming that perhaps I've missed something obvious that the framework already offers (though perhaps this falls into the blurry-line between Spring Integraton and Spring Batch). Can someone let me know if I'm totally off-base with my ideas or if there's a simple solution that I've missed, or offer alternative suggestions?