0

Please suggest me to solve the below issue, or suggest me any different approach to achieve my problem statement. I am getting data from somewhere and inserting it into cassandra daily basis then I need to retrieve the data from cassandra for whole week and do some processing and insert result back onto cassandra.

i have lot of records, each record executing most of the below operations. According to my previous post Repreparing preparedstatement warning suggestion, to avoid repreparing the prepared statement,tried to keep a map of query string vs prepared statements.

I tried writing following spark scala program ,i validated the cassandra host details from cqlsh, i am able to connect to it. But through program when i try, i am getting error.

class StatementCache {
  val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

  val session = acluster.connect("keyspacename");

      val statementCache: ConcurrentHashMap[String,PreparedStatement] = new ConcurrentHashMap


      def getStatement(cql : String): BoundStatement = {
    var ps : PreparedStatement = statementCache.get(cql);
     if (ps == null) {
                ps = session.prepare(cql);
                statementCache.put(cql, ps);
            }
            return ps.bind();
        }
    }


object CassandraUtils {
  println("##########entered cassandrutils")
   //val st=new STMT();
 private val psCache  : StatementCache = new StatementCache();
 val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  def insert(session: Session, data: TripHistoryData, batch: BatchStatement) {
   batch.add(psCache.getStatement(insertQuery));
  }

  def update(session: Session, data: TripHistoryData, batch: BatchStatement) {
    batch.add(psCache.getStatement(updateQuery));
    }

     def initialize(clusterNodes: String, uid: String, pwd: String, port: Int, racdc:String): Cluster = {

    val builder = Cluster.builder().addContactPoints(InetAddress.getByName(clusterNodes))
      .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(
          DCAwareRoundRobinPolicy.builder() //You can directly use the DCaware without TokenAware as well
            .withLocalDc(racdc) //This is case sensitive as defined in rac-dc properties file
            //.withUsedHostsPerRemoteDc(2) //Try at most 2 remote DC nodes in case all local nodes are dead in the current DC
            //.allowRemoteDCsForLocalConsistencyLevel()
            .build()))

    if (StringUtils.isNotEmpty(uid)) {
      builder.withCredentials(uid, pwd)
    }

    val cluster: Cluster = builder.build()
    cluster
  }
}

-----------------------------------------------------------------------------------------------------------------

I am getting following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ExceptionInInitializerError
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:91)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:45)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host hostname1: Host hostname1 requires authentication, but no authenticator found in Cluster configuration
    at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:261)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:243)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutorService.execute(MoreExecutors.java:299)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1.run(Futures.java:632)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)
    at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1288)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
shantha ramadurga
  • 75
  • 1
  • 2
  • 12

3 Answers3

0

you need to provide authentication in your cluster instance .withCredentials(username.trim(), password.trim())

Or you need to disable the authentication at cassandra level by changing authenticator key value to AllowAllAuthenticator in cassandra.yaml ..

Note: change in yaml requires cassandra restart

Laxmikant
  • 1,551
  • 1
  • 13
  • 30
0

i resolved the issue. By placing the below lines inside the getStatement method instead of outside method.

val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

val session = acluster.connect("keyspacename");
shantha ramadurga
  • 75
  • 1
  • 2
  • 12
0

Your problem is that you're trying to perform "manual" management of the connection - this doesn't work with Spark - the Cluster/Session instances should be sent to the executors, but it won't perform correctly, as these instances were created in driver. You can of course use the "typical" pattern of doing foreachPartition, etc., like described in this question.

The best way to work with Cassandra from Spark is to use Cassandra Spark Connector - it will automatically spread load between nodes, and perform correct insert & update of the data. In this case you configure connection parameters, including the authentication via Spark properties (spark.cassandra.auth.username & spark.cassandra.auth.password). More information about connecting is in the documentation.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Alex, i tried using spark cassandra connector ,but confused ,can you please check this link https://stackoverflow.com/q/52225041/10246547 and tell me where am i going wrong..please help me. – shantha ramadurga Sep 07 '18 at 14:54