1

I have some basic code that uses a prepared statement in a for loop and writes the result into a Cassandra Table with some throttling using a semaphore.

  Session session = null;
  try {
    session = connector.openSession();
  } catch( Exception ex ) {
    //  .. moan and complain..
    System.err.printf("Got %s trying to openSession - %s\n", ex.getClass().getCanonicalName(), ex.getMessage() );
  }
  if( session != null ) {

// Prepared Statement for Cassandra Inserts
        PreparedStatement statement = session.prepare(
                                "INSERT INTO model.base " +
                                "(channel, " +
                                "time_key, " +
                                "power" +
                                ") VALUES (?,?,?);");
        BoundStatement boundStatement = new BoundStatement(statement); 


//Query Cassandra Table that has capital letters in the column names        
        ResultSet results = session.execute("SELECT \"Time_Key\",\"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\" limit 800000;");

 // Get the Variables from each Row of Cassandra Data        
       for (Row row : results){
           // Upper Case Column Names in Cassandra
           time_key = row.getLong("Time_Key");
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth");


// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
                for(channel = 1.6000E8; channel <= channel_end; channel+=increment ){       
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {

                  ResultSetFuture rsf =  session.executeAsync(boundStatement.bind(channel,time_key,power));  
                       backlogList.add( rsf );   // put the new one at the end of the list
                       if( backlogList.size() > 10000 ) {      // wait till we have a few

                           while( backlogList.size() > 5432 ) {      // then harvest about half of the oldest ones of them

                               rsf = backlogList.remove(0);

                               rsf.getUninterruptibly();

                           }    // end while

                       }  // end if

                    }  // end if

                }  // end for

  } // end "row" for

 } // end session

My connection is built with the following:

public static void main(String[] args) {
if (args.length != 2) {
    System.err.println("Syntax: com.neutronis.Spark_Reports <Spark Master URL> <Cassandra contact point>");
    System.exit(1);
}

SparkConf conf = new SparkConf();
conf.setAppName("Spark Reports");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);

Spark_Reports app = new Spark_Reports(conf);

app.run();
}

With this code im attempting to use a semaphore but my Cassandra Cluster still seems to get overloaded and kick out the error:

ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)

It seems odd that it says no host was tried.

I've looked at other semaphore throttling issues such as this and this and attempted to apply to my code above but am still getting the error.

Community
  • 1
  • 1
mithrix
  • 542
  • 1
  • 4
  • 21

1 Answers1

2

Read my answer to this question for how to back-pressure when using asynchronous calls: What is the best way to get backpressure for Cassandra Writes?

Community
  • 1
  • 1
doanduyhai
  • 8,712
  • 27
  • 26
  • I see the article and it appears interesting do you have some reference code for a visual guide to follow for implementation? – mithrix Feb 16 '16 at 19:58
  • Unfortunately not. I've seen this pattern implemented many times on some projects but their source code is not open-sourced :( – doanduyhai Feb 16 '16 at 21:35