0

I have a table in cassandra whose structure is like this

CREATE TABLE dmp.Table (

pid text PRIMARY KEY,
day_count map<text, int>, 
first_seen map<text, timestamp>, 
last_seen map<text, timestamp>, 
usage_count map<text, int>
}

Now I'm trying to query it using spark-cassandra driver , So is there any where I can get the chunks of data. As in if I have 100 rows , I should be able to get 0-10 rows then 10 -20 and so on.

 CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);

I'm asking this as there is no column in my table where I can Query using IN clause to get range of rows.

Erick Ramirez
  • 13,964
  • 1
  • 18
  • 23
Rahul Koshaley
  • 201
  • 1
  • 5
  • 20

1 Answers1

1

You can add an auto-incrementing ID coloumn -- see my DataFrame-ified Zip With Index solution. Then you can query by the newly-created id column:

SELECT ... WHERE id >= 0 and id < 10;

Etc.

Community
  • 1
  • 1
David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • Hi thanks for the answer , but how to do the same using java ? Also for this to work I guess i have to read the entire table and thats where the problem lies , I cant read the entire table. Please suggest If Im wrong. – Rahul Koshaley Mar 21 '16 at 07:37
  • @RahulKoshaley depends on what you mean by "read the entire table". In order to put add the `id` column, the executors have to read the entire table, but your driver application does not. This lets you "window" the data, and only read N number of rows in the driver at a time. – David Griffin Mar 21 '16 at 11:05
  • Exactly how will I choose the N number of rows. ? – Rahul Koshaley Mar 21 '16 at 11:08
  • It's in my answer -- just add an offset to choose which window: `"SELECT ... WHERE id >= " + offset + " and id < " + (offset + 10)` – David Griffin Mar 21 '16 at 11:11
  • Sorry what I meant was after altering and adding an id column , how will I put value inside it. ? ie I have to read the entire table rows , and then put value in the Id column, as I cant select N rows (parts) with the existing table structure. – Rahul Koshaley Mar 21 '16 at 11:21
  • Follow the link in my answer -- `zipWithIndex` is an `RDD` function built into Spark. In the link, that function is used to create an `id` column. The populating is done for you. Yes, entire table (`RDD` actually) gets read, but only a partition at a time and only in the executors. – David Griffin Mar 21 '16 at 11:27
  • Thanks david , I will have to do the same in java. – Rahul Koshaley Mar 21 '16 at 11:40
  • One last question , How is it that Only a partition gets read , is it due to the zipWithIndex function ? – Rahul Koshaley Mar 21 '16 at 11:42
  • Basically, yes. In a nutshell, the code converts the `DataFrame` to an `RDD` using `DataFrame.rdd`, then runs `zipWithIndex`, which in Scala creates an `RDD` of tuples. You then convert this `RDD` back to a `DataFrame` by adding the new `Long` column to the original `DataFrame.columns` – David Griffin Mar 21 '16 at 11:45
  • Yes -- that's how `zipWithIndex` works. Simplifying it, but each partition is "zipped" separately. The driver calculates the number of rows per partition, and therefore the starting offset of each partition. Executors are given a partition and a starting index and it applies it to that partition. – David Griffin Mar 21 '16 at 11:49