2

We have been using Spark RDD API (Spark 2.0) for working with data modeled in Cassandra.Note that the data is modeled in Cassandra for efficient read and writes.

However now there is also the Spark SQL API's, Spark DataFrame API that is also an alternate data access method -http://spark.apache.org/docs/latest/sql-programming-guide.html

With Spark RDD, we were using CQL using the Datastax Cassandra driver APIs to access the Cassandra DB - http://docs.datastax.com/en/developer/java-driver/2.0/ , something like

val resultSets = new util.ArrayList[Row]()
val resultSet = CassandraConnector(SparkReader.conf).withSessionDo[ResultSet] { session =>
     val sel_stmt = QueryBuilder.select("yyy", "zz", "xxxx")
                .from("geokpi_keyspace", table_name)
                .where(QueryBuilder.eq("bin", bin))
                .and(QueryBuilder.eq("year", year))
                .and(QueryBuilder.eq("month", month))
                .and(QueryBuilder.eq("day", day))
                .and(QueryBuilder.eq("cell", cell))

    session.execute(sel_stmt)

    }
resultSets.addAll(resultSet.all())
})
resultSets.asScala.toList --> RDD[Row]

Since we are using CQL almost directly, it does not allow you to do things that are not supported by Cassandra like JOINS as Cassandra design does not support it. However the alternate way of using Spark SQL or Spark DataFrame API to access the Cassandra DB,gives you an SQL type abstraction.For an underlying Relational DB this would be good.

But using this abstraction,like JOIN to query the data stored in a NoSQL database like Cassandra seems to be a wrong abstraction.Working with this abstraction in Spark , without knowing anything about the data model (partition key, clustering key etc ), which is so important for efficient Read and Write of data, won't it lead to in-efficient generated code and in-efficient/slow data retrieval from underlying Cassandra node ?

Erick Ramirez
  • 13,964
  • 1
  • 18
  • 23
Alex Punnen
  • 5,287
  • 3
  • 59
  • 71

1 Answers1

0

I will argue that your that your assumption that we ignore data model when working with Spark SQL is incorrect and it practice we work under very strict contract, where data source may by default process only basic projections and selections and the heavy processing is performed by the Spark cluster.

At the same time Data Source developers have a free hand to include any type of domain or system specific knowledge when designing given connector. JDBC data source is a nice example of that and you can check for example my answer to How to partition Spark RDD when importing Postgres using JDBC? to see how it can be used to perform some non-standard operations.

While Cassandra Connector seems to be slightly limited here (forgive me if I a wrong, I haven't used it extensively) its RDD component provides a wide set Cassandra aware operations which can be used to perform server side operations and optimize overall workflow.

One way or another there is no case when Spark tries to force external system to perform operation which is not supported there.

won't it lead to in-efficient generated code and in-efficient/slow data retrieval

The fundamental question we have to ask here is why would it matter. By the mere fact of using given source for analytical jobs we implicitly accept the fact that we may stress given system in a way that it is not typical to its daily operational usage.

At the same time if we use system which doesn't support certain operations which are required in our data processing pipeline we should accept that cost of performing these operations may be significantly higher than in an optimized system. While inefficient processing costs money this thing should be taken under consideration when choosing technology stack and designing infrastructure.

Finally if some operations have unacceptable performance impact (yes, joins are expensive) it should be reflected in data modeling.

Since we are using CQL almost directly, it does not allow you to do things that are not supported by Cassandra like JOINS as Cassandra design does not support it

As already already explained neither does Spark SQL. Fetching data directly and performing join later doesn't change anything in the execution model.

Ignoring that there is nothing in this particular example that cannot be handled by DataFrame API and more complex retrieval can be performed with cassandraTable.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • "Well, if you can handle data using local data structures, as in your example, why use Spark in the first place? If data can be stored in a memory of a single machine, there are solutions out there, which can do much better job than Spark" --> Our data cannot be held in one spark or cassandra node; We use about 4 Cassandra nodes to store and read data in parallel from 2 to 4 Spark Worker nodes. Spark is used for distributed parallel processing of the data. Spark is very much needed, else we will have to roll our our message based task system with error handling etc.. – Alex Punnen Sep 22 '16 at 04:06
  • So how converting query result to local non-lazy structure (`resultSets.asScala.toList`) fits in that? – zero323 Sep 22 '16 at 12:12
  • The point is that with direct Cql usage as shown, there is no chance of using Join or similar, that using Spark SQL permits, I hope you got my question – Alex Punnen Sep 22 '16 at 14:02
  • Yeah, but it doesn't makes sense :) I mean parallelizing local objects is usually an antipatern (unless you have custom RDDs to back that and it is not a driver process). Moreover Spark SQL doesn't push anything beyond things I already stated to Cassandra. From its perspective it is not much different than flat text file. – zero323 Sep 22 '16 at 14:07
  • I think you should be more worried about the fact Spark SQL is actually a brute force approach which basically fetches everything and process later. There are of course more advanced options in the connector, like `joinWithCassandraTable`, but it is not Spark SQL. – zero323 Sep 22 '16 at 14:15
  • It is local to the spark worker, the fetch happens from each spark worker, we have two to four of these – Alex Punnen Sep 22 '16 at 14:47
  • I stand corrected. Still it doesn't provide any improvement over the existing model. – zero323 Sep 22 '16 at 15:11