I'm trying to get the distinct values of the partition key of a cassandra table in pyspark. However, pyspark seems not to understand me and fully iterates all data (which is a lot) instead of querying the index.
This is the code I use, which looks pretty straightforward to me:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Spark! This town not big enough for the two of us.") \
.getOrCreate()
ct = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="avt_sensor_data", keyspace="ipe_smart_meter")\
.load()
all_sensors = ct.select("machine_name", "sensor_name")\
.distinct() \
.collect()
The columns "machine_name" and "sensor_name" together form the partition key (see below for the complete schema). In my opinion, this should be super-fast, and in fact, if I execute this query in cql it takes only a couple of seconds:
select distinct machine_name,sensor_name from ipe_smart_meter.avt_sensor_data;
However, the spark job would take about 10 hours to complete. From what spark tells me about its plans, it looks like it really wants to iterate all the data:
== Physical Plan ==
*HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
+- Exchange hashpartitioning(machine_name#0, sensor_name#1, 200)
+- *HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@2ee2f21d [machine_name#0,sensor_name#1] ReadSchema: struct<machine_name:string,sensor_name:string>
I'm not an expert, but that doesn't look like "use the cassandra index" to me.
What am I doing wrong? Is there any way of telling spark to delegate the task of getting the distinct values from cassandra? Any help would be greatly appreciated!
If that helps, here is a schema description of the underlying cassandra table:
CREATE KEYSPACE ipe_smart_meter WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} AND durable_writes = true;
CREATE TABLE ipe_smart_meter.avt_sensor_data (
machine_name text,
sensor_name text,
ts timestamp,
id bigint,
value double,
PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = '[PRODUCTION] Table for raw data from AVT smart meters.'
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';