0

Context

Per the spring docs https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#using-the-get-command, the GET command on the SFTP outbound gateway with STREAM option would return the input stream corresponding to the file passed in the input channel.

We could configure an integration flow similar to the recommendation at
https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#configuring-with-the-java-dsl-3

@Bean
public QueueChannelSpec remoteFileOutputChannel() {
    return MessageChannels.queue();
}

@Bean
public IntegrationFlow sftpGetFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
                        AbstractRemoteFileOutboundGateway.Command.GET, "payload")
                .options(AbstractRemoteFileOutboundGateway.Option.STREAM))
        .channel("remoteFileOutputChannel")
        .get();
}

I plan to obtain the input stream from the caller similar to the response provided in the edits in the question here No Messages When Obtaining Input Stream from SFTP Outbound Gateway

public InputStream openFileStream(final int retryCount, final String filename, final String directory)
                 throws Exception {
          InputStream is = null;
          for (int i = 1; i <= retryCount; ++i) {
                 if (sftpGetInputChannel.send(MessageBuilder.withPayload(directory + "/" + filename).build(), ftpTimeout)) {
                       is = getInputStream();
                       if (is != null) {
                              break;
                       } else {
                              logger.info("Failed to obtain input stream so attempting retry " + i + " of " + retryCount);
                              Thread.sleep(ftpTimeout);
                       }
                 }
          }
          return is;
   }

   private InputStream getInputStream() {

          Message<?> msgs = stream.receive(ftpTimeout);

          if (msgs == null) {
                 return null;
          }

          InputStream is = (InputStream) msgs.getPayload();
          return is;
   }

I would like to pass the input stream to the item reader that is part of a Spring Batch job. The job would read from the input stream and close the stream/session upon completion.

Question

The response from the SFTP outbound gateway is sent to a queue channel. If there are concurrent GET requests to the gateway from multiple jobs/clients, how does the consumer pick the appropriate input stream from the blocking queue in the queue channel? The solution I could think of

  • Mark getInputStream as synchronized. This would ensure that only one consumer can send commands to the outbound gateway. Since all we are doing is returning a reference to the input stream, it is not a huge performance bottleneck. We could also set the capacity of the queue channel as an additional measure.

This is not an ideal solution because it is very much possible for other devs to bypass the synchronized method here and interact with the outbound gateway. We run the risk of fetching an incorrect stream.

The underlying SFTP client implementation used by Spring doesn't impose any such restrictions so I am seeking a Spring integration solution that can overcome this problem.

Does the GET with STREAM return any headers with the input file name from the payload that can be used by the client to make sure that the stream corresponds to the requested file? This would require peeking + inspection in to the queue before popping a message out of the queue. Not ideal, I think.

Is there a way to pass the response queue channel name as a parameter from the caller?

Appreciate any insights.

1 Answers1

1

Yes, simply set the replyChannel header with a new QueueChannel for each request and terminate the flow with the gateway; if there is no output channel, the ob gateway sends the reply to the header channel.

That is similar to how inbound gateways work.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    See also a `@MessagingGateway` to hide all of that request-reply burden from your code: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway – Artem Bilan Aug 16 '22 at 17:32
  • Gary - Thank you very much, it's great that we can pass the desired output channel in the request headers. Should I set with the OB gateway integration flow `sftpGetFlow' with a "null channel" for output channel or don't set any channel at all as part of its bean definition? Would you know if there is any documentation on the request headers that are useful and the response headers we can expect to receive? – tardistraveller Aug 16 '22 at 18:27
  • 1
    Just leave it with no channel - if you add `nullChannel()` that becomes the output channel. But, as @ArtemBilan said, use a gateway instead and the framework takes care of all the correlation for you without you having to deal with the low level details, channels, etc. The `replyChannel` header is discussed in the `Service Activator` section. `.handle()` is the DSL's equivalent of that pattern. https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#service-activator – Gary Russell Aug 16 '22 at 18:35
  • Artem - Do you propose below logic? If so, how do you return response headers which contains the session? `public interface MySftpGateway { @MessagingGateway(defaultRequestChannel = "sftpGetInputChannel") InputStream getResourceAsStream(String remoteFilePath); }` – tardistraveller Aug 16 '22 at 18:42
  • Unable to edit - here is what I meant, Artem `@MessagingGatewy(defaultRequestChannel="sftpGetInputChannel") public interface MySftpGateway { @Gateway(requestChannel = "sftpGetInputChannel") InputStream getResourceAsStream(String remoteFilePath); }` – tardistraveller Aug 16 '22 at 18:49
  • 1
    `Message` can be used in that case. Although I'd like to understand why you want to receive that `InputStream` yourself, but don't use some Spring Batch Integration features instead to continue the flow via another `handle()`. – Artem Bilan Aug 16 '22 at 18:52
  • 1. An inbound SFTP integration polls remote for a new file and starts spring Batch Job with the file name as job param. 2. The item reader would use OB gateway to fetch input stream - `remoteFileInputStream` `return new FlatFileItemReaderBuilder().name("transactionItemReader") .resource(new InputStreamResource(pgpDecoder.getRawDataAsStream(remoteFileInputStream)))` – tardistraveller Aug 16 '22 at 19:01