0

I am writing a set of methods that extend Spark RDD's API. I have to implement a general method for storing the RDDs, and for a start I tried to wrap spark-cassandra-connector's saveAsCassandraTable, without success.

Here's the "extending RDD's API" part:

object NewRDDFunctions {
  implicit def addStorageFunctions[T](rdd: RDD[T]):
  RDDStorageFunctions[T] = new RDDStorageFunctions(rdd)
}

class RDDStorageFunctions[T](rdd: RDD[T]) {
  def saveResultsToCassandra() {
    rdd.saveAsCassandraTable("ks_name", "table_name")    // this line produces errors!
  }
}

...and importing the object as: import ...NewRDDFunctions._.

The marked line produces following errors:

Error:(99, 29) could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[T]
    rdd.saveAsCassandraTable("ks_name", "table_name")
                            ^

Error:(99, 29) not enough arguments for method saveAsCassandraTable: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[T], implicit columnMapper: com.datastax.spark.connector.mapper.ColumnMapper[T])Unit.
Unspecified value parameters rwf, columnMapper.
    rdd.saveAsCassandraTable("ks_name", "table_name")
                            ^

I don't get why this doesn't work since saveAsCassandraTable is designed to work on any RDD. Any suggestions?


I had similar problem with the example in spark-cassandra-connector docs:

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))

...and the solution was to move case class definition out of "main" function (but I don't really know if this applies to the mentioned problem...).

bmscicho
  • 139
  • 2
  • 11

1 Answers1

3

saveAsCassandraTable needs 3 implicit parameters. The first one (connector) has a default value, the last two (rwf and columnMapper) are not in implicit scope in your saveResultsToCassandra method, as a consequence your method doesn't compile.

Look at this answer on another question, if you need some more information about implicits.

Turning your saveResultsToCassandra into the function below should work, if you have defined your tables (TableDef) before.

def saveResultsToCassandra()(
  // implicit parameters as a separate list!
  implicit rwf: RowWriterFactory[T], 
  columnMapper: ColumnMapper[T]
) {
  rdd.saveAsCassandraTable("ks_name", "table_name")
}
Community
  • 1
  • 1
Peter Neyens
  • 9,770
  • 27
  • 33
  • This part of code works, but when I use my `saveResultsToCassandra` method on an RDD, it raises: `Unspecified value parameters rwf, columnMapper`. How come it does not raise any errors when I use `saveAsCassandraTable("ks_name", "table_name")` directly (omitting my wrapper)? I don't want to define any tables, since `saveAsCassandraTable` does that for me. I looked at the answer you provided, but I don't know how does it apply in my case. – bmscicho Jun 27 '15 at 18:10
  • 1
    I had to add extra brackets just before `(implicit rwf: RowWriterFactory[T], columnMapper: ColumnMapper[T])`, thanks! – bmscicho Jun 27 '15 at 18:24