2

Sometimes No Messages When Obtaining Input Stream from SFTP Outbound Gateway

This is follow up question to Use SFTP Outbound Gateway to Obtain Input Stream

The problem I was having in previous question appears that I was not closing the stream as shown in the int:service-activator. However, when I added the int:service-activator then I was seemed to be forced to add int:poller.

However, when I added the int:poller I have noticed that sometimes now when attempting to obtain the stream the messages are null. I have found that a workaround is to simply retry. I have tested with different files and it seems that small files are adversely affected and large files are not. So, if I had to guess, there must be a race condition where the int:service-activator is closing the session before I try call getInputStream() but I was hoping someone could explain if this is what is actually going on and if there is a better solution than just simply retrying?

Thanks!

Here is the outbound gateway configuration:

<int-ftp:outbound-gateway session-factory="ftpClientFactory"
          request-channel="inboundGetStream" command="get" command-options="-stream"
          expression="payload" remote-directory="/" reply-channel="stream">
   </int-ftp:outbound-gateway>

   <int:channel id="stream">
          <int:queue/>
   </int:channel>

   <int:poller default="true" fixed-rate="50" />

   <int:service-activator input-channel="stream"
                 expression="payload.toString().equals('END') ? headers['file_remoteSession'].close() : null" />

Here is the source where I obtain the InputStream:

   public InputStream openFileStream(final int retryCount, final String filename, final String directory)
                 throws Exception {
          InputStream is = null;
          for (int i = 1; i <= retryCount; ++i) {
                 if (inboundGetStream.send(MessageBuilder.withPayload(directory + "/" + filename).build(), ftpTimeout)) {
                       is = getInputStream();
                       if (is != null) {
                              break;
                       } else {
                              logger.info("Failed to obtain input stream so attempting retry " + i + " of " + retryCount);
                              Thread.sleep(ftpTimeout);
                       }
                 }
          }
          return is;
   }

   private InputStream getInputStream() {

          Message<?> msgs = stream.receive(ftpTimeout);

          if (msgs == null) {
                 return null;
          }

          InputStream is = (InputStream) msgs.getPayload();
          return is;
   }

Update, I’ll go ahead and accept the only answer as it helped just enough to find the solution.

The answer to the original question accepted answer was confusing because it answered a java question with an xml configuration solution that while explained the problem didn’t really provide the necessary java technical solution. This follow up question/answer clarifies what is going on within spring-integration and sort of suggests what is necessary to solve.

Final solution. To obtain and save the stream for later, I had to create a bean to save the stream for later reference. This stream is obtained from the message header.

Note, error checking and getter/setter is left out for brevity:

  1. Use the same xml config as in the question above but eliminate the poller and service-activator elements as they are unnecessary and were causing the errors.

  2. Create a new class SftpStreamSession to hold necessary references:

    public class SftpStreamSession {
    
        private Session<?> session;
        private InputStream inputStream;
    
        public void close() {
            inputStream.close();
            session.close();
        }
    }
    
  3. Change the openFileStream method to return an SftpStreamSession:

    public SftpStreamSession openFileStream(final String filename, final String directory) throws Exception {
    
        SftpStreamSession sss = null;
        if (inboundGetStream.send(MessageBuilder.withPayload(directory + "/" + filename).build(), ftpTimeout)) {
    
            Message<?> msgs = stream.receive(ftpTimeout);
    
            InputStream is = (InputStream) msgs.getPayload();
    
            MessageHeaders mH = msgs.getHeaders();
            Session<?> session = (Session<?>) mH.get("file_remoteSession");
    
            sss = new SftpStreamSession(session, is);
        }
    
        return sss;
    }
    
Community
  • 1
  • 1
feblock352
  • 81
  • 1
  • 8

1 Answers1

0

First of all you don't need payload.toString().equals('END') because it looks like you don't use <int-file:splitter> in your logic.

Second. You don't need that ugly <service-activator> because you have full access to the message in your code. You can simply obtain that file_remoteSession, cast it into Session<?> and call its .close() in the end of your logic.

Yes, there is a race condition, but it happens in your code.

Look, you have stream QueueChannel. From the beginning you had one consumer stream.receive(ftpTimeout);. But now you have introduced that <int:service-activator input-channel="stream">. Therefore one more competition consumer. Having such a small (fixed-rate="50") polling interval indeed leads you to unexpected behavior.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks for help but I'm having trouble interpolating the docs to do what I want. So you mean that I do not need the `` nor the `` in xml, but can somehow obtain `file_remoteSession` directly in my `getInputStream` method from the `Message>` after I get the `InputStream`? – feblock352 Jun 14 '16 at 14:57
  • So it seems I can obtain the `file_remoteSession` from the message header. Can I just call close once I obtain the input stream in `getInputStream` method? – feblock352 Jun 14 '16 at 15:14
  • ??? You can if you want. But is that correct? You should close it when you'll have done with `InputStream` at all. This one is fully similar to standards when you deal with files, for example. So, you should process data from that stream and only after that close the session. Otherwise you won't be able to process stream after closing. – Artem Bilan Jun 14 '16 at 15:20