4

I am a newbie to spark and cassandra. I am trying to insert into cassandra table using spark-cassandra connector as below:

import java.util.UUID

import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)

object SparkConnectorContext {
  val conf = new SparkConf(true).setMaster("local")
    .set("spark.cassandra.connection.host", "192.168.xxx.xxx")
  val sc = new SparkContext(conf)
}
object TestRepo {
  def insertList(list: List[TestEntity]) = {
    SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
  }
}
object TestApp extends App {
  val start = System.currentTimeMillis()
  TestRepo.insertList(Utility.generateRandomData())
  val end = System.currentTimeMillis()
  val timeDiff = end-start
  println("Difference (in millis)= "+timeDiff)
}

When I insert using the above method (list with 100 entities), it takes 300-1100 milliseconds. I tried the same data to insert using phantom library. It is only taking less than 20-40 milliseconds.

Can anyone tell me why spark connector is taking this much time for insert? Am I doing anything wrong in my code or is it not advisable to use spark-cassandra connector for insert operations?

Yadu Krishnan
  • 3,492
  • 5
  • 41
  • 80
  • How many Cassandra nodes do you have? Are your spark workers running on the Cassandra nodes? I don't see any timing measurements in your code, so it looks like you are measuring a lot more operations than just the insert time. – Jim Meyer Aug 11 '15 at 12:49
  • I just started with cassandra and spark. So, I am using local spark. But cassandra is in a different machine, but same network. As for the timing measurement, i have edited the code. – Yadu Krishnan Aug 11 '15 at 13:00
  • @YaduKrishnan Spark is scattering operations all over the network whereas phantom is perform direct writes parallelised on the JVM through classic multi-threading. It's also far faster at data mapping since most of the magic is compile time, there's no runtime boxing since all conversions are specialized, the spark connector is not really touching on Scala bits that well. – flavian Aug 12 '15 at 08:23
  • @flavian: Thanks for the info. I will be using spark for all the analysis. But as with the insert into cassandra, which way is better? using direct phantom or spark-connector? I will be getting a list of data every few seconds interval. – Yadu Krishnan Aug 12 '15 at 09:26
  • @YaduKrishnan If there's nothing stopping you from using phantom for the insert, go for it. – flavian Aug 12 '15 at 10:04
  • @flavian spark cassandra connector neither does the mapping at runtime. It does part of the mapping at compile time as well, and the part which is dependent on the database schema known only at runtime is done at initialization time, only once. The mapping part never showed up to take significant part of running time in any profiling we did and it is very unlikely to cause performance differences. Sure, we probably could make mapping and conversions faster at expense of using experimental and fragile features like macros, but that would be optimizing sth that takes less than 1% of total time. – Piotr Kołaczkowski Aug 12 '15 at 17:58
  • @PiotrKolaczkowski Fair enough, I'm not familiar with the internals of the Spark connector that well. I think we are talking of tools built for completely different purposes and that as you point out in your answer the benchmark difference comes from setup time, I doubt anyone has a magic wand for higher speed I/O here. – flavian Aug 13 '15 at 08:29

2 Answers2

5

It looks like you are including the parallelize operation in your timing. Also since you have your spark worker running on a different machine than Cassandra, the saveToCassandra operation will be a write over the network.

Try configuring your system to run the spark workers on the Cassandra nodes. Then create an RDD in a separate step and invoke an action like count() on it to load the data into memory. Also you might want to persist() or cache() the RDD to make sure it stays in memory for the test.

Then time just the saveToCassandra of that cached RDD.

You might also want to look at the repartitionByCassandraReplica method offered by the Cassandra connector. That would partition the data in the RDD based on which Cassandra node the writes need to go to. In that way you exploit data locality and often avoid doing writes and shuffles over the network.

Jim Meyer
  • 9,275
  • 1
  • 24
  • 49
  • I have tried with cached rdd, and it improves the processing performance drastically. But my doubt is that, if a new record is inserted to C* table which is cached rdd using spark-connector, will the cache be cleared or updated automatically? Or it will remain the same ? – Yadu Krishnan Aug 12 '15 at 09:28
  • RDD's are immutable, so you cannot change them. In Spark you create new RDD's rather than modify existing ones. Cached RDD's will be evicted when Spark needs the memory for other things. – Jim Meyer Aug 12 '15 at 10:55
2

There are some serious problems with your "benchmark":

  1. Your data set is so small that you're measuring mostly only the job setup time. Saving 100 entities should be of order of single milliseconds on a single node, not seconds. Also saving 100 entities gives JVM no chance to compile the code you run to optimized machine code.
  2. You included spark context initialization in your measurement. JVM loads classes lazily, so the code for spark initialization is really called after the measurement is started. This is an extremely costly element, typically performed only once per whole spark application, not even per job.
  3. You're performing the measurement only once per launch. This means you're even incorrectly measuring spark ctx setup and job setup time, because the JVM has to load all the classes for the first time and Hotspot has probably no chance to kick in.

To summarize, you're very likely measuring mostly class loading time, which is dependent on the size and number of classes loaded. Spark is quite a large thing to load and a few hundred milliseconds are not surprising at all.

To measure insert performance correctly:

  • use larger data set
  • exclude one-time setup from the measurement
  • do multiple runs sharing the same spark context and discard a few initial ones, until you reach steady state performance.

BTW If you enable debug logging level, the connector logs the insert times for every partition in the executor logs.

Piotr Kołaczkowski
  • 2,601
  • 12
  • 14