2

Please find the Cassandra table below.

I am trying to copy data from 1 Cassandra table to another Cassandra table with same structure.

Please help me.

CREATE TABLE data2 (
        d_no text,
        d_type text,
        sn_perc int,
        tse_dt timestamp,
        f_lvl text,
        ign_f boolean,
        lk_loc text,
        lk_ts timestamp,
        mi_rem text,
        nr_fst text,
        perm_stat text,
        rec_crt_dt timestamp,
        sr_stat text,
        sor_query text,
        tp_dat text,
        tp_ts timestamp,
        tr_rem text,
        tr_type text,
        PRIMARY KEY (device_serial_no, device_type)
    ) WITH CLUSTERING ORDER BY (device_type ASC)

Data inserted using:

Insert into data2(all column names) values('64FCFCFC','HUM',4,'1970-01-02 05:30:00’ ,’NA’,true,'NA','1970-01-02 05:40:00',’NA’,'NA','NA','1970-02-01 05:30:00','NA','NA','NA','1970-02-03 05:30:00','NA','NA');

Note: The 4th column timestamp when i try to insert like this '1970-01-02 05:30:00’ ,and in dtaframe also timestamp inserted correctly ,but when insert from dataframe to cassandra and use select * from table, i see its being inserted like 1970-01-02 00:00:00.000000+0000

similarly for all time stamp columns its happening .

pom.xml

<dependencies>
       <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

I want to read these values and write it into another Cassandra table using spark Scala. See code below:

val df2 = spark.read
                       .format("org.apache.spark.sql.cassandra")
                       .option("spark.cassandra.connection.host","hostname")
                       .option("spark.cassandra.connection.port","9042")
                       .option( "spark.cassandra.auth.username","usr")
                       .option("spark.cassandra.auth.password","pas")
                       .option("keyspace","hr")
                       .option("table","data2")
                       .load()
Val df3 =doing some processing on df2.
df3.write
         .format("org.apache.spark.sql.cassandra")
         .mode("append")
         .option("spark.cassandra.connection.host","hostname")
         .option("spark.cassandra.connection.port","9042")
         .option( "spark.cassandra.auth.username","usr")
         .option("spark.cassandra.auth.password","pas")
         .option("spark.cassandra.output.ignoreNulls","true")
         .option("confirm.truncate","true")
         .option("keyspace","hr")
         .option("table","data3")
         .save()

But i am getting below error, when i try to insert data using above code,

java.lang.IllegalArgumentException: requirement failed: Invalid row size: 18 instead of 17.
    at scala.Predef$.require(Predef.scala:224)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
shantha ramadurga
  • 75
  • 1
  • 2
  • 12

1 Answers1

2

That's a known problem (SPARKC-541) - you're copying the data from table that has DSE Search enabled to the table without it. You simply need to drop this column as part of your transformations:

val df3 = df2.drop("solr_query").... // your transformations

Or you can simply use the newer driver (2.3.1 if you're using OSS driver), or corresponding DSE release that contains this fix.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Hey thanks Alex, when i drop solr_query ,its working fine. i am new to cassandra, i did not understand the 2nd options which you have suggested.can you please eloborate,added the pom.xml above. – shantha ramadurga Aug 20 '18 at 13:29
  • There are different implementations, depending on what you're using - you can use DSE Analytics, then the Cassandra Connector is there, and you can declare it as `provided` dependency (but your DSE release should have correct version - can't say right now in which it was fixed, check with support), or you can use external Spark cluster, and then you either can use DSE BYOS jar, or use connector as you do above. What DSE version are you using? – Alex Ott Aug 20 '18 at 13:35
  • You mean to say about the below version? com.datastax.spark spark-cassandra-connector_2.11 2.3.1 – shantha ramadurga Aug 20 '18 at 13:44
  • No, I mean the version of DSE itself - do you execute your spark job on DSE, or standalone Spark? – Alex Ott Aug 20 '18 at 13:45
  • standalone, locally i have setup 1 cassandra instance and remotely another cluster is there, i am executing locally.. – shantha ramadurga Aug 20 '18 at 13:47
  • ok, then your pom.xml should work without explicit `drop` (if doesn't work, then it could be that it will be released separately, although bug is in the `RESOLVED` state – Alex Ott Aug 20 '18 at 14:13
  • ok..can you please look into following issue as well,https://stackoverflow.com/q/51947182/10246547 – shantha ramadurga Aug 21 '18 at 11:00
  • Alex please help me on this issue,https://stackoverflow.com/q/52134514/10246547 you give a great ideas. – shantha ramadurga Sep 03 '18 at 15:28