Sometimes No Messages When Obtaining Input Stream from SFTP Outbound Gateway
This is follow up question to Use SFTP Outbound Gateway to Obtain Input Stream
The problem I was having in previous question appears that I was not closing the stream as shown in the int:service-activator. However, when I added the int:service-activator then I was seemed to be forced to add int:poller.
However, when I added the int:poller I have noticed that sometimes now when attempting to obtain the stream the messages are null. I have found that a workaround is to simply retry. I have tested with different files and it seems that small files are adversely affected and large files are not. So, if I had to guess, there must be a race condition where the int:service-activator is closing the session before I try call getInputStream() but I was hoping someone could explain if this is what is actually going on and if there is a better solution than just simply retrying?
Thanks!
Here is the outbound gateway configuration:
<int-ftp:outbound-gateway session-factory="ftpClientFactory"
request-channel="inboundGetStream" command="get" command-options="-stream"
expression="payload" remote-directory="/" reply-channel="stream">
</int-ftp:outbound-gateway>
<int:channel id="stream">
<int:queue/>
</int:channel>
<int:poller default="true" fixed-rate="50" />
<int:service-activator input-channel="stream"
expression="payload.toString().equals('END') ? headers['file_remoteSession'].close() : null" />
Here is the source where I obtain the InputStream:
public InputStream openFileStream(final int retryCount, final String filename, final String directory)
throws Exception {
InputStream is = null;
for (int i = 1; i <= retryCount; ++i) {
if (inboundGetStream.send(MessageBuilder.withPayload(directory + "/" + filename).build(), ftpTimeout)) {
is = getInputStream();
if (is != null) {
break;
} else {
logger.info("Failed to obtain input stream so attempting retry " + i + " of " + retryCount);
Thread.sleep(ftpTimeout);
}
}
}
return is;
}
private InputStream getInputStream() {
Message<?> msgs = stream.receive(ftpTimeout);
if (msgs == null) {
return null;
}
InputStream is = (InputStream) msgs.getPayload();
return is;
}
Update, I’ll go ahead and accept the only answer as it helped just enough to find the solution.
The answer to the original question accepted answer was confusing because it answered a java question with an xml configuration solution that while explained the problem didn’t really provide the necessary java technical solution. This follow up question/answer clarifies what is going on within spring-integration and sort of suggests what is necessary to solve.
Final solution. To obtain and save the stream for later, I had to create a bean to save the stream for later reference. This stream is obtained from the message header.
Note, error checking and getter/setter is left out for brevity:
Use the same xml config as in the question above but eliminate the poller and service-activator elements as they are unnecessary and were causing the errors.
Create a new class SftpStreamSession to hold necessary references:
public class SftpStreamSession { private Session<?> session; private InputStream inputStream; public void close() { inputStream.close(); session.close(); } }
Change the openFileStream method to return an SftpStreamSession:
public SftpStreamSession openFileStream(final String filename, final String directory) throws Exception { SftpStreamSession sss = null; if (inboundGetStream.send(MessageBuilder.withPayload(directory + "/" + filename).build(), ftpTimeout)) { Message<?> msgs = stream.receive(ftpTimeout); InputStream is = (InputStream) msgs.getPayload(); MessageHeaders mH = msgs.getHeaders(); Session<?> session = (Session<?>) mH.get("file_remoteSession"); sss = new SftpStreamSession(session, is); } return sss; }