10

I'm implementing an IBackingMap for my Trident topology to store tuples to ElasticSearch (I know there are several implementations for Trident/ElasticSearch integration already existing at GitHub however I've decided to implement a custom one which suits my task better).

So my implementation is a classic one with a factory:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) {

        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName).build();

        // TODO add a possibility to close the client
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host, port));
    }

    // the actual implementation is left out
}

You see it gets host/port/cluster name as input params and creates an ElasticSearch client as a member of the class BUT IT NEVER CLOSES THE CLIENT.

It is then used from within a topology in a pretty familiar way:

tridentTopology.newStream("spout", spout)
            // ...some processing steps here...
            .groupBy(aggregationFields)
            .persistentAggregate(
                    ElasticSearchBackingMap.getFactoryFor(
                            ElasticSearchConfig.ES_HOST,
                            ElasticSearchConfig.ES_PORT,
                            ElasticSearchConfig.ES_CLUSTER_NAME
                    ),
                    new Fields(FieldNames.OUTCOME),
                    new BatchAggregator(),
                    new Fields(FieldNames.AGGREGATED));

This topology is wrapped into some public static void main, packed in a jar and sent to Storm for execution.

The question is, should I worry about closing the ElasticSearch connection or it is Storm's own business? If it is not done by Storm, how and when in the topology's lifecycle I should do that?

Thanks in advance!

dedek
  • 7,981
  • 3
  • 38
  • 68
bopcat
  • 378
  • 4
  • 13
  • TransportClient should be a singleton for each storm worker. [user-list](http://elasticsearch-users.115913.n3.nabble.com/What-is-your-best-practice-to-access-a-cluster-by-a-Java-client-td4015311.html). Actually, I think you don't need to close the java client because of storm topology should never stop. – fhussonnois May 13 '15 at 13:06
  • 1
    A hack could be: create a singleton on each worker e.g. when creating first state and close this singleton in the clean-up method of your aggregator - I see `BatchAggregator` in your code. But I would also like to see a better solution... – dedek Jul 23 '15 at 12:48
  • See also this feature request: https://issues.apache.org/jira/browse/STORM-49 – dedek Jul 27 '15 at 06:56
  • Thanks @dedek for the hack! Now I'm implementing a State sending events to Kafka with the newest version of Kafka Producer API and it _must_ be closed or there's a risk of events being lost. BatchAggregator in my code implements ReducerAggregator which has no cleanup(). But there's cleanup in the standard Function which makes possible to implement smth like CloseConnectionFunction [facepalm] The question is if killing a topology triggers that cleanup(), I've never seen it triggered on a LocalCluster... Tomorrow will try with a real cluster and see... – bopcat Aug 04 '15 at 20:31
  • @bopcat You are probably right with the missing clean up trigger. See this post: http://qnalist.com/questions/5082578/cleanup-method-not-called-for-the-basebasicbolt-when-the-topology-is-killed Then hopefully the JVM shutdown hook or `finalize()` methods could do the thing... Please post any working solution if you find some... – dedek Aug 05 '15 at 05:52
  • 1
    @dedek please see my answer below. Of course not the best one so let's hope some day STORM-49 becomes implemented. – bopcat Aug 05 '15 at 14:42

1 Answers1

3

Okay, answering my own question.

First of all, thanks again @dedek for suggestions and reviving the ticket in Storm's Jira.

Finally, since there's no official way to do that, I've decided to go for cleanup() method of Trident's Filter. So far I've verified the following (for Storm v. 0.9.4):

With LocalCluster

  • cleanup() gets called on cluster's shutdown
  • cleanup() DOESN'T get called when killing the topology, this shouldn't be a tragedy, very likely one won't use LocalCluster for real deployments anyway

With a real cluster

  • it gets called when the topology is killed as well as when the worker is stopped using pkill -TERM -u storm -f 'backtype.storm.daemon.worker'
  • it doesn't get called if the worker is killed with kill -9 or when it crashes or - sadly - when the worker dies due to an exception

In overall that gives more or less decent guarantee of cleanup() to get called, provided you'll be careful with exception handling (I tend to add 'thundercatches' to every of my Trident primitives anyway).

My code:

public class CloseFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);

    private final Closeable[] closeables;

    public CloseFilter(Closeable... closeables) {
        this.closeables = closeables;
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        return true;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }

    @Override
    public void cleanup() {
        for (Closeable c : closeables) {
            try {
                c.close();
            } catch (Exception e) {
                LOG.warn("Failed to close an instance of {}", c.getClass(), e);
            }
        }
    }
}

However would be nice if some day hooks for closing connections become a part of the API.

dedek
  • 7,981
  • 3
  • 38
  • 68
bopcat
  • 378
  • 4
  • 13