2

I am running a Storm topology that reads data from Kafka broker and writes into Cassandra. One of my Cassandra bolts performs read and write operations. My keyspace always set dynamically. Now I want to get a connection to Cassandra using a connection pool?

My stream has keyspace name in it. I need to dynamically insert data into appropriate keyspace/

1) I tried to get a Cassandra connection using connection pool method inside of execute method so that each and every tuple gets a Cassandra connection. So after some point of time my connection reached my thread 1024 pool connection limit. Connection time out error after.

Example:

ExecutorService pool = Executors.newFixedThreadPool(1024); 
public void execute(Tuple input)    {
  if(input.size()>0)        {          
     ConnectionManager cm=new ConnectionManager();     
     cm.keyspace = "dda400db5ef2d";
     statement = cm.poolRun();  
     cql="select * form columnfamily where id='d78346';
   }
}

2) I tried to get a connection using prepare method when the topology initialized the worker and get a static connection

public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {          
  _OutputCollector=collector;
  ConnectionManager cm=new ConnectionManager();
  cm.keyspace ="dda400db5ef2d";    statement = cm.poolRun();    
} 

public void execute(Tuple input)  { 
  if(input.size()>0) {          
    cql="select * form columnfamily where id='d78346';
  }
}

The second case works if data belongs to one keyspace. But my case data belongs to different keyspaces, and here only one topology it will identify keyspace and write into that keyspace.

Is there any hashing method available in storm to hold a keyspace connections?

or

Any other logic?

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
kannadhasan
  • 339
  • 4
  • 17
  • It looks like you are creating a connection pool for every task. I'm not really that familiar with Cassandra, but you either need to create a connection pool per process and share it between your tasks or use individual connections. In either case it looks like you are not closing your connections or returning them to the pool. – bridiver Jul 12 '14 at 02:24
  • yes,I am not closing connection also.But I want do each stream may come from different key-space data.How to implement? – kannadhasan Jul 12 '14 at 05:04

2 Answers2

0

I'm not familiar with Cassandra but I think this is what you want:

private ConnectionManager cm = null;
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {          
  _OutputCollector=collector;
  cm=new ConnectionManager();    
}

public void execute(Tuple input)  { 
  cm.keyspace = "dda400db5ef2d";
  statement = cm.poolRun();  
  cql="select * form columnfamily where id='d78346';
}

The prepare method is run at the start of the topology once. You can use it to initialize your variables/connections and run them in your execute method. You will probably want to close your connection once your query is complete and reset the connection on the next query. Hope this helps

Naresh
  • 610
  • 1
  • 4
  • 14
  • I believe this will still setup a connection manager per task, as opposed to a single connection manager shared by all tasks – bridiver Jul 12 '14 at 12:15
  • Still i have problem error is java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ThreadPoolExecutor.addWor – kannadhasan Jul 14 '14 at 05:14
  • Is there any method available in storm, once task is complete to close a connection? – kannadhasan Jul 14 '14 at 06:19
0

Do following things -

1) Not just Cassandra cluster however, create a session in prepare method of your bolt. Do not use keyspace while create a session.

public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {   
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
}

2) Now create your query based on your data. Here you should have your keyspace name prefixed with your table name.

public void execute(Tuple input)  { 
  if(input.size()>0) {          
     cql="select * form keyspace.table where id='d78346'";
  }
}