8

Picture the following senario: A Spark application (Java implementation) is using Cassandra database to load, convert to RDD and process the data. Also the application is steaming new data from the database which are also processed by a custom receiver. The output of the streaming process is stored in the database. The implementation is using Spring Data Cassandra from the integration with the database.

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main method:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

It is expected to have a big amount of data for the initial loading. For this reason the data are paginated, loaded and distributed in rddBuffer.

There are also the following options available:

  1. The Spark-Cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), although there is minimum amount of documentation for this example.
  2. The Calliope project (http://tuplejump.github.io/calliope/)

I would like to know what is the best practice for the integration of Spark with Cassandra. What would be the best option to follow in my implementation?

Apache Spark 1.0.0, Apache Cassandra 2.0.8

  • Are you planning to use Spark on a single node or on a cluster? – maasg Jun 27 '14 at 16:35
  • 1
    ps: Consider using Scala for Spark. It's so much nicer. – maasg Jun 27 '14 at 16:40
  • This is a prototype implementation, in the next phase all code will be written in Java 8 (lambda expressions) or Scala (closures). The choice of the language will depend on several factors. Also, in cluster, it is pointless to use Skark on a single node. – Pantelis Papapoulias Jun 28 '14 at 20:41
  • @maasg is correct, his answer is good too. Trust us when we say your life will be made 10x easier when you go to Scala. We have a saying "once you go Scala, you never go back". – samthebest Jun 29 '14 at 11:25
  • @samthebest Both Scala and Java 8 seem to offer great advantages over previous versions of Java or Python implementations. Scala seems to be a natural fit with Spark since it is a Scala implementation. But again the choice between those 2 will be evaluated in a later phase. For now my intention is to focus on the fundamentals. – Pantelis Papapoulias Jun 29 '14 at 19:16
  • @Pantelis fair point :) – samthebest Jun 30 '14 at 07:01
  • Since today, there is the 3rd, recommended option to use the official driver for Spark: https://github.com/datastax/cassandra-driver-spark – Piotr Kołaczkowski Jun 30 '14 at 19:48

2 Answers2

9

The easiest way to work with Cassandra and Spark is to use the official open source Cassandra driver for Spark developed by DataStax: https://github.com/datastax/spark-cassandra-connector

This driver has been built on top of Cassandra Java Driver and provides a direct bridge between Cassandra and Spark. Unlike Calliope, it does not use the Hadoop interface. Additionally it offers the following unique features:

  • support for all Cassandra data types, including collections, out of the box
  • lightweight mapping of Cassandra rows to custom classes or tuples without the need to use any implicits or other advanced features in Scala
  • saving any RDDs to Cassandra
  • full support for Cassandra Virtual Nodes
  • ability to filter / select on the server side, e.g. leveraging Cassandra clustering columns or secondary indexes
Piotr Kołaczkowski
  • 2,601
  • 12
  • 14
  • Thank you, that was exactly what I needed for my implementation. – Pantelis Papapoulias Jul 01 '14 at 07:06
  • Have a look at the CassandraConnector class provided by the spark-cassandra-connector. It makes it easy to run CQL queries and so on inside operations on RDDs (foreach, map etc.) so they are distributed across the cluster. It is not emphasised enough in the documentation IMO. – David Tinker Oct 07 '14 at 07:55
  • Piotr, can you take a look at this question about datastax's`spark-cassandra-connector`? http://stackoverflow.com/questions/27130321/fail-to-open-thrift-connection-to-cassandra Thanks. – chrisTina Nov 25 '14 at 18:35
1

The approach in the code above is a classical centralized algorithm that would work only if executed in one node. Both Cassandra and Spark are distributed systems and therefore it's necessary to model the process in such a way that it can be distributed among a number of nodes.

There are few approaches possible: If you know the keys of the rows to fetch, you could do something simple like this: (using the DataStax Java Driver)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

This will effectively distribute the fetching of keys across the Spark Cluster. Note how the connection to C* is done within the closure as this ensures that the connection is established when the task is executed on each separate, distributed worker.

Given that your example uses a wildcard (i.e. the keys are not known), using the Hadoop interface of Cassandra is a good option. The Spark-Cassandra example linked in the question illustrates the use of this Hadoop interface on Cassandra.

Calliope is a library that encapsulates the complexity of using the Hadoop interface, by providing a simple API to access that functionality. It's available only in Scala, as it uses specific Scala features (like implicits and macros in the upcoming release) With Calliope, you basically declare how to convert your RDD[type] into a row key and row value, and Calliope takes care of configuring the hadoop interfaces to to the job. We have found that Calliope (and the underlying hadoop interfaces) are 2-4x faster than using a driver to interact with Cassandra.

Conclusion: I'd walk away from the Spring-Data configuration to access Cassandra, as this will limit you to a single node. Consider a simple parallelized access if possible or explore using Calliope in Scala.

maasg
  • 37,100
  • 11
  • 88
  • 115
  • Thanks for your answer maasg. You make clear the differences between the two options for the Hadoop interface and the the Calliope API. But it is not clear why the Spring Data Cassandra option is not a valid one over a cluster architecture. Can you please provide more details on this? On the given sample code the data is loaded locally and distributed on the cluster in batches. An RDD buffer (with union on each batch) is used for this purpose. Is it that each worker node will load in RDD the same dataset in parallel? – Pantelis Papapoulias Jun 30 '14 at 05:40
  • You should not build a new Cluster in a tight loop of the application. This would kill performance. Also, you need to properly close Cluster instances after use. Otherwise you'll soon run out of memory and/or threads. Cluster and Session instances are thread-safe. It is ok to move them out of the lambda and share, however, as they are not Serializable, it would be not possible to distribute them. This problem has been solved by Connector class in the official cassandra-driver-spark module. – Piotr Kołaczkowski Jun 30 '14 at 19:44
  • @PiotrKolaczkowski Given that cluster is not serializable, it needs to be instantiated in the lambda as that code will be executed by each worker on a distributed Spark system. The case you mention would only work on local mode. Re: cassandra-spark: You are certainly close to the source. I was in the announcement this morning on the Spark Summit. Will be looking into it ASAP. – maasg Jun 30 '14 at 21:43
  • You're right, that's why I mentioned the CassandraConnector class the driver uses to solve exactly this problem. CassandraConnector can be viewed as a serializable connection to Cassandra. It allows for sharing a Cluster between threads on the same JVM, but when sent over the network to a remote node, it reestablishes connection to the cluster transparently. It also offers some nice utilities to avoid resource leaks. – Piotr Kołaczkowski Jul 01 '14 at 07:42