2

Spring Integration has ZooKeeper support as documented in https://docs.spring.io/spring-integration/reference/html/zookeeper.html However this document is so vague.

It suggests adding below bean but does not give details on how to start/stop a poller when the node is granted leadership.

@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
    return new LeaderInitiatorFactoryBean()
                .setClient(client)
                .setPath("/siTest/")
                .setRole("cluster");
}

Do we have any example on how to ensure below poller is run only once in a cluster at any time using zookeeper?

@Component
public class EventsPoller {

    public void pullEvents() {
        //pull events should be run by only one node in the cluster at any time
    }
}
suman j
  • 6,710
  • 11
  • 58
  • 109

1 Answers1

2

The LeaderInitiator emits an OnGrantedEvent and OnRevokedEvent, when it becomes leader and its leadership is revoked.

See https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#endpoint-roles and the next https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#leadership-event-handling for more info about those events handling and how it affects your components in the particular role.

Although I agree that Zookkeper chapter must have some link to that SmartLifecycleRoleController chapter. Feel free to raise a JIRA on the matter and contribution is welcome!

UPDATE

This is what I did in our test:

@RunWith(SpringRunner.class)
@DirtiesContext
public class LeaderInitiatorFactoryBeanTests extends ZookeeperTestSupport {

    private static CuratorFramework client;

    @Autowired
    private PollableChannel stringsChannel;

    @BeforeClass
    public static void getClient() throws Exception {
        client = createNewClient();
    }

    @AfterClass
    public static void closeClient() {
        if (client != null) {
            client.close();
        }
    }

    @Test
    public void test() {
        assertNotNull(this.stringsChannel.receive(10_000));
    }


    @Configuration
    @EnableIntegration
    public static class Config {

        @Bean
        public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
            return new LeaderInitiatorFactoryBean()
                    .setClient(client)
                    .setPath("/siTest/")
                    .setRole("foo");
        }

        @Bean
        public CuratorFramework client() {
            return LeaderInitiatorFactoryBeanTests.client;
        }

        @Bean
        @InboundChannelAdapter(channel = "stringsChannel", autoStartup = "false", poller = @Poller(fixedDelay = "100"))
        @Role("foo")
        public Supplier<String> inboundChannelAdapter() {
            return () -> "foo";
        }

        @Bean
        public PollableChannel stringsChannel() {
            return new QueueChannel();
        }

    }

}

And I have in logs something like this:

2018-12-14 10:12:33,542 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Starting [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
2018-12-14 10:12:33,578 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Stopping [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • do you have any example for a simple poller to be controlled by zookeeper leader election? I tried implementing SmartLifecycle, adding a DefaultCandidate to leaderInitiator bean. But still I don't see the lifecycle methods getting called or controlled by zookeeper. Wondering if there is any example for a simple poller? – suman j Dec 14 '18 at 09:32
  • 1
    Please, see an UPDATE in my answer. – Artem Bilan Dec 14 '18 at 15:13