I am running a Storm topology that reads data from Kafka broker and writes into Cassandra. One of my Cassandra bolts performs read and write operations. My keyspace always set dynamically. Now I want to get a connection to Cassandra using a connection pool?
My stream has keyspace name in it. I need to dynamically insert data into appropriate keyspace/
1) I tried to get a Cassandra connection using connection pool method inside of execute
method so that each and every tuple gets a Cassandra connection. So after some point of time my connection reached my thread 1024 pool connection limit. Connection time out error after.
Example:
ExecutorService pool = Executors.newFixedThreadPool(1024);
public void execute(Tuple input) {
if(input.size()>0) {
ConnectionManager cm=new ConnectionManager();
cm.keyspace = "dda400db5ef2d";
statement = cm.poolRun();
cql="select * form columnfamily where id='d78346';
}
}
2) I tried to get a connection using prepare
method when the topology initialized the worker and get a static connection
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
_OutputCollector=collector;
ConnectionManager cm=new ConnectionManager();
cm.keyspace ="dda400db5ef2d"; statement = cm.poolRun();
}
public void execute(Tuple input) {
if(input.size()>0) {
cql="select * form columnfamily where id='d78346';
}
}
The second case works if data belongs to one keyspace. But my case data belongs to different keyspaces, and here only one topology it will identify keyspace and write into that keyspace.
Is there any hashing method available in storm to hold a keyspace connections?
or
Any other logic?