2

I want to write Structure Streaming Data into Cassandra using Pyspark API.

My data flow is like below:

Nifi -> Kafka -> Spark Structure Streaming -> Cassandra

I have tried below way:

query = df.writeStream\
  .format("org.apache.spark.sql.cassandra")\
  .option("keyspace", "demo")\
  .option("table", "test")\
  .start()

But getting below error message: "org.apache.spark.sql.cassandra" does not support streaming write.

Also another approach I have tried: [Source - DSE 6.0 Administrator Guide]

query = df.writeStream\
   .cassandraFormat("test", "demo")\
   .start()

But got exception: AttributeError: 'DataStreamWriter' object has no attribute 'cassandraFormat'

Can anyone give me some idea how I can proceed further ?

Thanks in advance.

Atanu chatterjee
  • 457
  • 5
  • 16

3 Answers3

6

After upgrading DSE 6.0 (latest version) I am able to write structured streaming data into Cassandra. [Spark 2.2 & Cassandra 3.11]

Reference Code:

query = fileStreamDf.writeStream\
 .option("checkpointLocation", '/tmp/check_point/')\
 .format("org.apache.spark.sql.cassandra")\
 .option("keyspace", "analytics")\
 .option("table", "test")\
 .start()

DSE documentation URL: https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/structuredStreaming.html

Atanu chatterjee
  • 457
  • 5
  • 16
  • Did you have to use Spark bundled with DSE in addition to using Cassandra from DSE? I tried with the Spark libraries in IDEA but couldn't get rid of "org.apache.spark.sql.cassandra does not support streamed writing" error. Btw, I've found this discussion: https://groups.google.com/a/lists.datastax.com/forum/#!msg/spark-connector-user/0aHJ4oskw7Q/xPKoqrtVAgAJ – BJ_ Sep 14 '18 at 21:54
  • I had similar issue. I could not make it work using DataProc Spark. – Farshad Javadi Oct 05 '18 at 22:54
2

This answer is for writing data to Cassandra, not DSE (which supports Structured Streaming for storing data)

For Spark 2.4.0 and higher, you can use the foreachBatch method, which allows you to use the Cassandra batch data writer provided by the Spark Cassandra Connector to write the output of every micro-batch of the streaming query to Cassandra:

import org.apache.spark.sql.cassandra._

df.writeStream
  .foreachBatch { (batchDF, _) => 
    batchDF
     .write
     .cassandraFormat("tableName", "keyspace")
     .mode("append")
     .save
  }.start

For Spark versions lower than 2.4.0, you need to implement a foreach sink.

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.Statement
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

class CassandraSink(sparkConf: SparkConf) extends ForeachWriter[Row] {
    def open(partitionId: Long, version: Long): Boolean = true

    def process(row: Row) = {
      def buildStatement: Statement =
        QueryBuilder.insertInto("keyspace", "tableName")
          .value("key", row.getAs[String]("value"))
      CassandraConnector(sparkConf).withSessionDo { session =>
        session.execute(buildStatement)
      }
    }

    def close(errorOrNull: Throwable) = Unit
}

And then you can use the foreach sink as follows:

df.writeStream
 .foreach(new CassandraSink(spark.sparkContext.getConf))
 .start
  • thank you but i am not using DSE cassandra ... I am using spark-sql-2.4.x with spark-cassandra-connector-2_11 with cassandra 3.0 , does this work with it ? – BdEngineer May 31 '19 at 06:04
  • Yes, my answer is for writing into Cassandra, not DSE. You can use foreachBatch for Spark 2.4.x – Harichandan Pulagam Jun 01 '19 at 01:05
  • even thought it is working fine ... it is taking a lot of time to produce results... i tried with "trigger" option with 15 seconds....i.e... companyDfAfterJoin .writeStream() .trigger(Trigger.ProcessingTime("15 seconds")) .foreachBatch((batchDf, batchId) -> {..... what should i do make it processing in every 15 seconds. – BdEngineer Jun 03 '19 at 06:50
  • Getting error : org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; Project [company_id#80, company_name#66, year#81, quarter#82, mean_revenu – BdEngineer Jun 03 '19 at 08:48
  • I followed this link ... https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#semantic-guarantees-of-aggregation-with-watermarking .... it messed up with above error. – BdEngineer Jun 03 '19 at 12:50
  • https://stackoverflow.com/questions/56428367/analysisexception-append-output-mode-not-supported-when-there-are-streaming-agg – BdEngineer Jun 03 '19 at 13:27
1

Not much you can do here other than:

  • Following (and voting for) corresponding JIRA.
  • Implementing required functionality and opening PR yourself.

Other than that, you can just create use foreach sink and write directly.