5

I have an event queue with n message listeners. When a message arrives, one message listener takes it and executes a new instance of a state machine. The problem I'm facing is that although multiple messages are handled in parallel, the state machine starts executes actions sequentially although they are invoked by different state machine instances as you can see here:

2017-10-18 16:11:03.740  INFO 30282 --- [lTaskExecutor-1] o.s.s.support.LifecycleObjectSupport     : started org.springframework.statemachine.support.DefaultStateMachineExecutor@6ddb1ea6
2017-10-18 16:11:03.741  INFO 30282 --- [lTaskExecutor-1] o.s.s.support.LifecycleObjectSupport     : started EVALUATE_IS_WALKTHROUGH SAVE END START  / START / uuid=b922b6b1-a441-4924-8531-d45e0e0c9c40 / id=null
2017-10-18 16:11:03.740  INFO 30282 --- [TaskExecutor-10] o.s.s.support.LifecycleObjectSupport     : started org.springframework.statemachine.support.DefaultStateMachineExecutor@13b6ace4
2017-10-18 16:11:03.741  INFO 30282 --- [TaskExecutor-10] o.s.s.support.LifecycleObjectSupport     : started EVALUATE_IS_WALKTHROUGH SAVE END START  / START / uuid=e06a8c1d-beed-41c6-bc63-d8c1a3a56169 / id=null
2017-10-18 16:11:03.759  INFO 30282 --- [pool-5-thread-1] i.b.b.e.processors.actions.SaveAction    : [io.botbit.backend.events.processors.actions.SaveAction@607e4071] Saving event Event[id=null,     
2017-10-18 16:11:24.046  INFO 30282 --- [pool-5-thread-1] i.b.b.e.processors.actions.SaveAction    : [io.botbit.backend.events.processors.actions.SaveAction@607e4071] Saving event Event[id=null, 
2017-10-18 16:11:44.058  INFO 30282 --- [pool-5-thread-1] i.b.b.e.p.a.EvaluateIsWalkthroughAction  : Evaluation is WT,,,
2017-10-18 16:11:44.059  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped org.springframework.statemachine.support.DefaultStateMachineExecutor@6ddb1ea6
2017-10-18 16:11:44.060  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped EVALUATE_IS_WALKTHROUGH SAVE END START  /  / uuid=b922b6b1-a441-4924-8531-d45e0e0c9c40 / id=null
2017-10-18 16:11:44.060  INFO 30282 --- [pool-5-thread-1] i.b.b.e.p.a.EvaluateIsWalkthroughAction  : Evaluation is WT,,,
2017-10-18 16:11:44.061  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped org.springframework.statemachine.support.DefaultStateMachineExecutor@13b6ace4
2017-10-18 16:11:44.061  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped EVALUATE_IS_WALKTHROUGH SAVE END START  /  / uuid=e06a8c1d-beed-41c6-bc63-d8c1a3a56169 / id=nul

I think this happens because every action is run within the same thread ([pool-5-thread-1]). I need every instance to run fully in parallel, and by this I mean the states machine are executed in parallel but also its actions.

Any help will be appreciated, thanks!

@Component
public class EventConsumer {
    private final static Logger logger = Logger.getLogger(EventConsumer.class);
    @Autowired
    private StateMachineFactory<String, String> eventProcessorFactory;

    public void consume(Event event) {
        logger.info("Received <" + event + ">");
        StateMachine<String, String> eventProcessor = eventProcessorFactory.getStateMachine();
        eventProcessor.getExtendedState().getVariables().put("event", event);
        eventProcessor.start();
        eventProcessor.sendEvent(CommonEvents.SUCCESS);
    }

    public void consume(Collection<Event> events) {
        for (Event event : events) {
            this.consume(event);
        }
    }
}

And this is the state machine configuration

@Configuration
@EnableStateMachineFactory
public class WiFiConnectEventProcessorConfig extends StateMachineConfigurerAdapter<String, String> {

    @Autowired
    SaveAction saveAction;
    @Autowired
    DeprecateAddVisitationAction addVisitation;
    @Autowired
    EvaluateIsWalkthroughAction isWTAction;

    @Override
    public void configure(StateMachineStateConfigurer<String, String> states) throws Exception {
        states.withStates().initial(WiFiConnectStates.START).state(WiFiConnectStates.SAVE, saveAction)
                .state(WiFiConnectStates.DEPRECATE_ADD_VISITATION, addVisitation)
                .state(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH, isWTAction).end(WiFiConnectStates.END);
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<String, String> transitions) throws Exception {
        transitions.withExternal().source(WiFiConnectStates.START).target(WiFiConnectStates.SAVE)
                .event(CommonEvents.SUCCESS)
                .and().withExternal().source(WiFiConnectStates.SAVE)
                .target(WiFiConnectStates.DEPRECATE_ADD_VISITATION).event(CommonEvents.SUCCESS)
                .and().withExternal()
                .source(WiFiConnectStates.DEPRECATE_ADD_VISITATION).target(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH)
                .event(CommonEvents.SUCCESS)
                .and().withExternal().source(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH)
                .target(WiFiConnectStates.END).event(CommonEvents.SUCCESS);
    }
}
  • The documentation does indicate this as a current limitation, see [13.1.1 Adapter Factory Limitations](https://docs.spring.io/spring-statemachine/docs/2.0.3.RELEASE/reference/htmlsingle/#adapter-factory-limitations). If you haven't already done so by now, try using the [State Machine via Builder](https://docs.spring.io/spring-statemachine/docs/2.0.3.RELEASE/reference/htmlsingle/#state-machine-via-builder). – Matt Jan 17 '19 at 20:52

2 Answers2

1

You need to create your own TaskScheduler, and config it as follows within your StateMachineConfig @Configuration file. Choose a proper PoolSize for your needs.

    @Override
    public void configure(StateMachineConfigurationConfigurer<TaskState, TaskEvent> config) throws Exception {
        config.withConfiguration()
                .taskScheduler(myTaskScheduler());
    }

    @Bean
    public TaskScheduler myTaskScheduler() {
        final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        return scheduler;
    }
0

As mentioned in the comment, your state machines are going to share the same resources, including the TaskExecutor & TaskScheduler.

The default behavior of is to use a single thread executor in the scheduler, that's why you see a bottle neck in your application.

https://github.com/spring-projects/spring-framework/blob/master/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java#L90

This explains the various type of executor and scheduler you can use: https://docs.spring.io/spring/docs/4.2.x/spring-framework-reference/html/scheduling.html

Then you can upgrade your config:

  @Override
  public void configure(StateMachineConfigurationConfigurer<RfqState, RfqEvent> config)
      throws Exception {
    SyncTaskExecutor executor = new SyncTaskExecutor();
    config.withConfiguration()
        .taskExecutor( my executor )
        .taskScheduler( my scheduler );
  }
0x26res
  • 11,925
  • 11
  • 54
  • 108