When people are mentioning the pyspark-cassandra
- they are mostly mention it because it exposes the RDD part of Spark Cassandra Connector (SCC), that is not exposed by SCC itself (for Python it exposes only Dataframe API).
How to use SCC with Astra is quite good described in the SCC 2.5.0 release announcement blog post, and in the documentation. You start pyspark with following command (you may specify username, password, and other parameters, except --packages
inside your code, not necessary on the command line):
pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1\
--files path_to/secure-connect-test.zip \
--conf spark.cassandra.connection.config.cloud.path=secure-connect-test.zip \
--conf spark.cassandra.auth.username=UserName \
--conf spark.cassandra.auth.password=Password \
--conf spark.dse.continuousPagingEnabled=false
Please note the flag disabling continuous paging - it's required right now, as it's described in this post.
After process has started, just execute Spark commands that read, transform and write data:
>>> from pyspark.sql.functions import col
# read data
>>> data = park.read.format("org.apache.spark.sql.cassandra")\
.options(table="t2", keyspace="test").load()
>>> data.count()
5
>>> data.show(5, truncate = False)
+---+-----------------------+
|id |tm |
+---+-----------------------+
|4 |2020-06-23 10:37:25.825|
|3 |2020-06-23 10:37:25.754|
|5 |2020-06-23 10:37:25.852|
|1 |2020-06-23 10:37:25.701|
|2 |2020-06-23 10:37:25.726|
+---+-----------------------+
# generate new data frame
>>> data2 = data.select((col("id") + 10).alias("id"), col("tm"))
>>> data2.show()
+---+--------------------+
| id| tm|
+---+--------------------+
| 13|2020-06-23 10:37:...|
| 14|2020-06-23 10:37:...|
| 15|2020-06-23 10:37:...|
| 11|2020-06-23 10:37:...|
| 12|2020-06-23 10:37:...|
+---+--------------------+
# write the data
>>> data2.write.format("org.apache.spark.sql.cassandra")\
.options(table="t2", keyspace="test").mode("append").save()
# check that data is written
>>> spark.read.format("org.apache.spark.sql.cassandra")\
.options(table="t2", keyspace="test").load().count()
10