2

I am not sure if my endpoints are setup correctly. My end goal is when I call stop on both adapters, that if a file is placed in either directory where the inbound channel adapters are polling, the file will not be processed. Which is not the case -- files are still getting polled and processed.

Currently, when the code runs, from the very start, 6 Task Schedulers are launched:

2015-08-03 18:12:40,011 [task-scheduler-6] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-5] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-8] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-1] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-3] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-1] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

When I issue a stop

@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop()

on one of the adapters, I am left with 5 Task Schedulers. Here is the stop command being issued:

2015-08-03 18:15:28,392 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'control'
2015-08-03 18:15:28,392 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - preSend on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]
2015-08-03 18:15:28,392 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321] received message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]
2015-08-03 18:15:28,407 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'
2015-08-03 18:15:28,407 [http-8080-1] INFO  org.springframework.integration.endpoint.SourcePollingChannelAdapter - stopped mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter
2015-08-03 18:15:28,407 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321]' produced no reply for request Message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]
2015-08-03 18:15:28,407 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]

And here are the resulting Task Schedulers after the first stop command, there are only 5 now:

2015-08-03 18:15:30,017 [task-scheduler-7] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-1] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-6] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-10] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-2] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

When I issue the second stop on the second adapter

@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop()

Here is the command:

2015-08-03 18:21:49,652 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'control'
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - preSend on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321] received message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'
2015-08-03 18:21:49,668 [http-8080-1] INFO  org.springframework.integration.endpoint.SourcePollingChannelAdapter - stopped mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321]' produced no reply for request Message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]

I am left with 4 Task Schedulers:

2015-08-03 18:23:40,014 [task-scheduler-2] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:23:40,014 [task-scheduler-6] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:23:40,014 [task-scheduler-5] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:23:40,014 [task-scheduler-7] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

Here is the code MecFilePoller class:

@Configuration
@PropertySource("classpath:mec.properties") 
@EnableIntegration
@IntegrationComponentScan
public class MecFilePoller {

private Job job;

private String fileParameterName;

@Autowired
MecProperty mecProperty;

@Bean
MessageChannel control() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel="control")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

public void setFileParameterName(String fileParameterName) {
    this.fileParameterName = fileParameterName;
}

public void setJob(Job job) {
    this.job = job;
}


@Bean
@InboundChannelAdapter(value = "ssnInboundFileChannel", poller = @Poller(cron="${mec/SSN_POLLER}"))
public MessageSource<File> fileMessageSourceSSN() {
     System.out.println("SSN POLLING...");
     FileReadingMessageSource source = initialSetUp();
     source.setDirectory(new File(mecProperty.getProperty(MecConstants.SSN_SFTP_WORKING_DIR)));
     System.out.println("enter fileMessageSource in following dir....." + mecProperty.getProperty(MecConstants.SSN_SFTP_WORKING_DIR));
     return source;
}

@Bean
@InboundChannelAdapter(value = "mdwInboundFileChannel", poller = @Poller(cron="${mec/MDW_POLLER}"))
public MessageSource<File> fileMessageSourceMDW() {
    System.out.println("MDW POLLING...");
    FileReadingMessageSource source = initialSetUp();
    source.setDirectory(new File(mecProperty.getProperty(MecConstants.MDW_SFTP_WORKING_DIR)));
    System.out.println("enter fileMessageSource in following dir....." + mecProperty.getProperty(MecConstants.MDW_SFTP_WORKING_DIR));
    return source;
}

private FileReadingMessageSource initialSetUp() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<File>(); 
    SimplePatternFileListFilter simplePatternFileListFilter = new SimplePatternFileListFilter("*.done");
    AcceptOnceFileListFilter<File> acceptOnceFileListFilter = new AcceptOnceFileListFilter<File>();

    compositeFileListFilter.addFilter(simplePatternFileListFilter);
    compositeFileListFilter.addFilter(acceptOnceFileListFilter);

    source.setFilter(compositeFileListFilter);
    return source;
}

@Transformer(inputChannel="mdwInboundFileChannel",outputChannel="mdwOutboundJobRequestChannel")
public JobLaunchRequest toRequestMDW(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(INPUT_FILE_NAME, message.getPayload().getAbsolutePath());
    return new JobLaunchRequest((Job) appContext.getBean("mecmdwJob"), jobParametersBuilder.toJobParameters());
}    

@Transformer(inputChannel="ssnInboundFileChannel",outputChannel="ssnOutboundJobRequestChannel")
public JobLaunchRequest toRequestSSN(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(INPUT_FILE_NAME, message.getPayload().getAbsolutePath());
    return new JobLaunchRequest((Job) appContext.getBean("mecssnJob"), jobParametersBuilder.toJobParameters());
}    

public void commandTypePollerRequest(String command, String type) {
    if(command.equals("stop") && type.equals("mdw")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop()"));
    }
    else if(command.equals("stop") && type.equals("ssn")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop()"));
    }
    else if(command.equals("start") && type.equals("mdw")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.start()"));
    }
    else if(command.equals("start") && type.equals("ssn")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.start()"));
    }
}

}

Here is the MecPollerController class:

@Controller
public class MECPollerController {

@Autowired
MecFilePoller mecFilePoller;

@Autowired
private ApplicationContext appContext;

@Autowired
MessageChannel ssnOutboundJobRequestChannel;

@Autowired
MessageChannel mdwOutboundJobRequestChannel;

@RequestMapping(value = "ui/stopMdwPoller.action", method = RequestMethod.GET)
public void stopMdwPollerRequest() {
    mecFilePoller.commandTypePollerRequest("stop","mdw");
}

@RequestMapping(value = "ui/stopSsnPoller.action", method = RequestMethod.GET)
public void stopSsnPollerRequest() {
    mecFilePoller.commandTypePollerRequest("stop","ssn");
}

@RequestMapping(value = "ui/startMdwPoller.action", method = RequestMethod.GET)
public void startMdwPollerRequest() {
    mecFilePoller.commandTypePollerRequest("start","mdw");
}

@RequestMapping(value = "ui/startSsnPoller.action", method = RequestMethod.GET)
public void startSsnPollerRequest() {
    mecFilePoller.commandTypePollerRequest("start","ssn");
}

}

Here is the XML config for the one Spring Batch job, job-mecmdw.xml:

<int:channel id="mdwInboundFileChannel" />
<int:channel id="mdwOutboundJobRequestChannel" />
<int:channel id="mdwJobLaunchReplyChannel" />
<int:annotation-config />       
<bean class="org.springframework.integration.core.MessagingTemplate"></bean>

<!--    THIS WAS MOVED TO MecFilePoller
<int:transformer input-channel="mdwInboundFileChannel" output-channel="mdwOutboundJobRequestChannel">
    <bean
        class="org.batch.poller.MecFilePoller">
        <property name="job" ref="mecmdwJob" />
        <property name="fileParameterName" value="input.file.name" />
    </bean>
</int:transformer>
-->
<batch-int:job-launching-gateway request-channel="mdwOutboundJobRequestChannel" reply-channel="mdwJobLaunchReplyChannel" />
<int:logging-channel-adapter channel="mdwJobLaunchReplyChannel" />

Here is the XML config for the other Spring Batch job, job-mecssn.xml:

<int:annotation-config />           
<int:channel id="ssnInboundFileChannel" />
<int:channel id="ssnOutboundJobRequestChannel" />
<int:channel id="ssnJobLaunchReplyChannel" />   

<!--    THIS WAS MOVED TO MecFilePoller
<int:transformer input-channel="ssnInboundFileChannel"
    output-channel="ssnOutboundJobRequestChannel">
    <bean
        class="org.batch.poller.MecFilePoller">
        <property name="job" ref="mecssnJob" />
        <property name="fileParameterName" value="input.file.name" />
    </bean>
</int:transformer>
-->

<batch-int:job-launching-gateway request-channel="ssnOutboundJobRequestChannel" reply-channel="ssnJobLaunchReplyChannel" />
<int:logging-channel-adapter channel="ssnJobLaunchReplyChannel" />  
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
Tim Schumacher
  • 321
  • 3
  • 19

1 Answers1

1

Inbound channel adapters consist of (at least two) beans; in this case, a MessageSource and a SourcePollingChannelAdapter.

In your example, the message source gets the specified bean name (filePoller). The channel adapter's name is generated, and will be

filePoller.fileMessageSource.inboundChannelAdapter

(@Configuration class bean name + method name + type)

We have an open JIRA issue to document these generated bean names.

Since the name is dotted, you'll need to quote it when using the control bus:

@'filePoller.fileMessageSource.inboundChannelAdapter'.stop()

EDIT

I just tested this and it works just fine...

@Configuration
@EnableIntegration
public class Foo {

    @Bean
    QueueChannel inboundFileChannel() {
        return new QueueChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "inboundFileChannel", poller = @Poller(fixedDelay="1000"))
    public MessageSource<?> fileMessageSource() {
        MessageSource<?> source = mock(MessageSource.class);
        return source;
    }

    @Bean
    MessageChannel control() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel="control")
    public ExpressionControlBusFactoryBean controlBus() {
        return new ExpressionControlBusFactoryBean();
    }

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Foo.class);
        Thread.sleep(2000);
        ctx.getBean("control", MessageChannel.class).send(
                new GenericMessage<String>("@'foo.fileMessageSource.inboundChannelAdapter'.stop()"));
        assertFalse(ctx.getBean(SourcePollingChannelAdapter.class).isRunning());
        Thread.sleep(2000);
        ctx.close();
    }
}

EDIT2 :

I put some sleeps in my example above and see this...

2015-08-03 12:23:34,528 [task-scheduler-1] DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 12:23:35,530 [task-scheduler-2] DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 12:23:35,615 [main] DEBUG: org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'control'
2015-08-03 12:23:35,616 [main] DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'control', message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,616 [main] DEBUG: org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@1e0b4072] received message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,617 [main] DEBUG: org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'foo.fileMessageSource.inboundChannelAdapter'
2015-08-03 12:23:35,618 [main] INFO : org.springframework.integration.endpoint.SourcePollingChannelAdapter - stopped foo.fileMessageSource.inboundChannelAdapter
2015-08-03 12:23:35,618 [main] DEBUG: org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@1e0b4072]' produced no reply for request Message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,618 [main] DEBUG: org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'control', message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,618 [main] DEBUG: org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'foo.fileMessageSource.inboundChannelAdapter'

(no polls after the stop).

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks. But when I changed it to control.send(new GenericMessage("@'filePoller.fileMessageSource.inboundChannelAdapter'.stop()")), I get the error org.springframework.expression.spel.SpelEvaluationException: EL1058E:(pos 1): A problem occurred when trying to resolve bean 'filePoller.fileMessageSource.inboundChannelAdapter':'Could not resolve bean reference against BeanFactory'. Essentially, NoSuchBeanDefinitionException: No bean named 'filePoller.fileMessageSource.inboundChannelAdapter' is defined – Tim Schumacher Aug 03 '15 at 15:31
  • Try removing the name from the bean, or change it - or rename the configuration class; the configuration class `FilePoller` gets bean name `filePoller`. It might be causing the issue. – Gary Russell Aug 03 '15 at 15:36
  • Thanks, Gary. I did as you said. Updated my class to be MECFilePoller and my bean name to be (different from class name) mdwFilePoller. My file message source method is now fileMessageSourceMDW, too. My send command to control bus is now: control.send(new GenericMessage("@'mdwFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop()")). Unfortunately, I still get exception No bean named 'mdwFilePoller.fileMessageSourceMDW.inboundChannelAdapter' is defined – Tim Schumacher Aug 03 '15 at 15:51
  • If your class is `MECFilePoller`, the bean name is `mECFilePoller` not `mdw...`. I just pasted a test that works fine into my answer. – Gary Russell Aug 03 '15 at 15:57
  • Great that worked. Though I thought it would immediately stop polling? I am still seeing polling messages: SourcePollingChannelAdapter - Received no Message during the poll, returning 'false', even after having called stop. – Tim Schumacher Aug 03 '15 at 16:21
  • Maybe you have another polled adapter? I just posted the log from my test; compare it to yours. – Gary Russell Aug 03 '15 at 16:28
  • For me, I notice that it stops ONE of the task schedulers, the other task schedulers still poll -- even if I keep on issuing STOP. In your DEBUG statements I noticed you had TWO task schedulers --were both stopped by one stop command? – Tim Schumacher Aug 03 '15 at 23:29
  • There is only one scheduler with 10 threads by default. If you see activity after the stop you must have other endpoints. The scheduler is not stopped, the endpoint is. If you can't figure it out post your complete configuration and a complete debug log someplace. – Gary Russell Aug 03 '15 at 23:39
  • Thanks, Gary. I would need your help. I have updated the question with my code and debug statements. I really appreciate the help. – Tim Schumacher Aug 04 '15 at 01:30
  • Hi Gary, does it make sense to add @MessageEndpoint on the MecFilePoller class or is MecFilePoller already implicitly a MessageEndpoint? I don't think I have any other endpoints in my project. – Tim Schumacher Aug 04 '15 at 18:56
  • Again, there are not 6 schedulers, there's one scheduler with (default) 10 threads. Your first snippet clearly shows you have 6 pollable endpoints in your context (they are all polled in the same millisecond) and you are only stopping 2 of them. Perhaps you are loading the context multiple times? If you turn on DEBUG logging, look at the bean creation logging. This `System.out.println("SSN POLLING...");` is pretty useless; that method (creating the bean definition) is invoked once only, during initialization. Try adding a breakpoint in the SPCA poller to determine the bean name of the `source` – Gary Russell Aug 05 '15 at 00:12
  • Thanks, Gary. I figured it out. Once I commented out my two redundant in the XMLs -- I had the annotated Transformer anyways in the MecFilePoller class. Doing that gave me the correct number of pollable endpoints -- 2 instead of 6. – Tim Schumacher Aug 05 '15 at 06:32