I am writing a set of methods that extend Spark RDD's API.
I have to implement a general method for storing the RDDs, and for a start I tried to wrap spark-cassandra-connector's saveAsCassandraTable
, without success.
Here's the "extending RDD's API" part:
object NewRDDFunctions {
implicit def addStorageFunctions[T](rdd: RDD[T]):
RDDStorageFunctions[T] = new RDDStorageFunctions(rdd)
}
class RDDStorageFunctions[T](rdd: RDD[T]) {
def saveResultsToCassandra() {
rdd.saveAsCassandraTable("ks_name", "table_name") // this line produces errors!
}
}
...and importing the object as: import ...NewRDDFunctions._
.
The marked line produces following errors:
Error:(99, 29) could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[T]
rdd.saveAsCassandraTable("ks_name", "table_name")
^
Error:(99, 29) not enough arguments for method saveAsCassandraTable: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[T], implicit columnMapper: com.datastax.spark.connector.mapper.ColumnMapper[T])Unit.
Unspecified value parameters rwf, columnMapper.
rdd.saveAsCassandraTable("ks_name", "table_name")
^
I don't get why this doesn't work since saveAsCassandraTable
is designed to work on any RDD. Any suggestions?
I had similar problem with the example in spark-cassandra-connector docs:
case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))
...and the solution was to move case class definition out of "main" function (but I don't really know if this applies to the mentioned problem...).