3

When querying Cassandra with non-indexed column in the where clause, Spark-Cassandra-Connector's official documentation says,

To filter rows, you can use the filter transformation provided by Spark. However, this approach causes all rows to be fetched from Cassandra and then filtered by Spark.

I am a bit confused about this. If, for example, I have a billion rows of this db structure: ID, City, State, and Country, where only ID is indexed. If I use City = 'Chicago' in where clause, would Spark first download all the billion rows, and then filter out rows where City = 'Chicago'? Or would it read some chunk of data from Cassandra, run the filter, store the rows that match the criteria, then get more chunk of data, get the rows matching the condition, and set them aside again... and continue the process. And if at any point, RAM and or Disk storage is running low, delete/offload/get rid of data that didn't match the criteria, and get the new chunk of data to continue the process?

Also, can someone tell me a general formula to calculate how much disk space would it take to save one bigdecimal column and 3 text columns of billion rows?

Faraz
  • 6,025
  • 5
  • 31
  • 88

2 Answers2

4

Filtering rows can happen either in the database or in Spark. What the documentation is recommending is to try as much as possible to filter records in the database, instead of doing it in spark. What that means:

sc.cassandraTable("test", "cars")
  .select("id", "model")
  .where("color = ?", "black")

The above statement is going to run the color = 'black' filter in Cassandra, the database, so Spark is not going to fetch into its memory any records with colors other than black. Instead of pulling the billion records into memory, Spark may be loading just a few millions that happen to have black as value in the color column.

In contrast, filtering can be done in spark:

sc.cassandraTable("test", "cars")
  .select("id", "model")
  .filter(car -> "black".equals(car.getColor()))

This last version will load all billions of records into Spark's memory, and then filter them by color in Spark. Obviously, this cannot be preferred to the previous version which minimized the amount of memory needed for the Spark cluster. So for any simple filtering that can be handled in the database, the database/driver/query filters should be used.

About estimating memory requirements, there have been other questions that proposed various approaches, please check this, and this. There's also a good suggestion in spark's documentation:

How much memory you will need will depend on your application. To determine how much your application uses for a certain dataset size, load part of your dataset in a Spark RDD and use the Storage tab of Spark’s monitoring UI (http://:4040) to see its size in memory. Note that memory usage is greatly affected by storage level and serialization format – see the tuning guide for tips on how to reduce it.

ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • In that test.cars example, I am assuming that `color` is indexed. Otherwise, wouldn't it throw `ALLOW FILTERING` error? – Faraz Apr 01 '18 at 06:25
  • @FarazDurrani That's right. Cassandra's primary key/partition key-related query rules still apply. I assumed it was legal to filter by that field. – ernest_k Apr 01 '18 at 06:32
  • If you want to do that and cannot create a secondary index on the table field, then you can only do it using spark's filter (rdd or data frame filtering). – ernest_k Apr 01 '18 at 06:37
  • I think even if I use a non-indexed column in where clause, it will still happen at the DB (Cassandra) level. And what I get in my memory is only those rows that match the criteria. – Faraz Apr 01 '18 at 06:56
  • Okay. Let's be clear on 2 things. First, filtering in *the database* only happens when you use `.where(cql predicate)` (not with `.filter`). Second, you're right about `allow filtering` (even if the connector may still hit a db error - docs say *not all predicates are allowed by the Cassandra engine*). In the end, it will be up to you the data owner to determine what the efficient way of running the query is, and what can be tuned (indices, or rather running the filter in spark)... – ernest_k Apr 01 '18 at 06:57
  • @FarazDurrani I was also clearing up a lot of my own confusion :-). I suppose I also wanted to make the point that *it's preferable to make the query deterministic and db-filter on indexed columns*. I'd personally prefer to err on the side of what the db guarantees rather than on allowed features that may change in future. – ernest_k Apr 01 '18 at 07:04
  • Cool. If you find that the required memory is simply inordinate, remember you can still use Cassandra's materialized views... – ernest_k Apr 01 '18 at 07:13
  • Thx for that recommendation. – Faraz Apr 01 '18 at 07:20
  • Can you elaborate on `not all predicates are allowed by the Cassandra engine` please? Please explain it with querying unindexed columns in where clause in mind. – Faraz Apr 01 '18 at 07:50
  • Oh that came from https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where. Don't know what the specifics and exceptions are, but examples similar to your filter are almost everywhere, so I suppose that should be just fine. – ernest_k Apr 01 '18 at 09:12
2

The spark cassandra connector will issue multiple queries (1 per spark task) with a specific token range. So overall it will be a full table scan, but it will be done one bit at a time, and in parallel. If you run a spark worker on each cassandra node, then the connector will choose the token range to match the local cassandra node. This will limit data shuffling across the network. Yet a full table scan happens which is not ideal.

Christophe Schmitz
  • 2,896
  • 2
  • 14
  • 21
  • Even if it's a full table scan, what will end up in my RAM? Rows matching the criteria right? – Faraz Apr 01 '18 at 06:51