0

I'm tryng to flatten a dataframe in Databricks using pyspark. The source table is in cassandra. This is the code I'm using:

df_vehicle_temp = (df_vehicle_temp
                   .withColumn('device_id_b', df_vehicle_temp.device_id)
                   .withColumn('timestamp_b', df_vehicle_temp.timestamp)
                   .withColumn('VEHICLE', df_vehicle_temp.ots)
                   .withColumn('dwellMilliseconds_vehicle', df_vehicle_temp.dwell_milliseconds)
                   .drop('ots', 'dwell_milliseconds','device_id', 'timestamp')
                  )
df_person_temp = cassandra_df.select('device_id', 'timestamp', 'ots', 'dwell_milliseconds', 'views').where(cassandra_df.category=='PERSON')
df_person_temp = (df_person_temp
                   .withColumn('PERSON', df_person_temp.ots)
                   .withColumn('dwellMilliseconds_person', df_person_temp.dwell_milliseconds)
                   .withColumn('views_person', df_person_temp.views)
                   .drop('ots', 'dwell_milliseconds', 'views')
                  )
cond = [df_person_temp.device_id == df_vehicle_temp.device_id_b, df_person_temp.timestamp == df_vehicle_temp.timestamp_b]
flettened_df = df_person_temp.join(df_vehicle_temp, cond).select('device_id', 'timestamp','PERSON','views_person','dwellMilliseconds_person','VEHICLE','dwellMilliseconds_vehicle')

If some renaiming of column or drop seems redundant, they probably are. This is the last code I tryed after give up

The code runs well but if I try to display the dataframe I receive the following error:

IllegalStateException: Unexpected partitioning: DataSourcePartitioning

But I don't Understand where I'm wrong.

The full trace of the error is very long, But I can put it if needed.

EDIT: here the additional information requested

  • Databricks version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)
  • Databricks cluster: Standars_DS3_v2 14Gb 2core
  • Cassandra connector: com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0

Full Trace of the error:

java.lang.IllegalStateException: Unexpected partitioning: DataSourcePartitioning
    at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec(partitioning.scala:245)
    at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec$(partitioning.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.DataSourcePartitioning.createShuffleSpec(DataSourcePartitioning.scala:27)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5(EnsureRequirements.scala:129)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5$adapted(EnsureRequirements.scala:124)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:124)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:471)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:441)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:629)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:629)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
    at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:45)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
    at org.apache.spark.sql.execution.CollectLimitExec.mapChildren(limit.scala:49)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:602)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:441)
    at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:718)
    at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:550)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$3(AdaptiveSparkPlanExec.scala:1089)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:1089)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.applyPhysicalRules(AdaptiveSparkPlanExec.scala:1088)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$initialPlan$1(AdaptiveSparkPlanExec.scala:293)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.<init>(AdaptiveSparkPlanExec.scala:292)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:83)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:42)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$2(QueryExecution.scala:596)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:596)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:595)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:232)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:268)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:228)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:222)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:298)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:361)
    at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:325)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:202)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3143)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:266)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:100)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:587)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:623)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:494)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:678)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:559)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:638)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:744)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:601)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:223)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:559)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:210)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$12(DriverLocal.scala:631)
    at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
    at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:59)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:59)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:608)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:615)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:607)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:526)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:561)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
    at java.lang.Thread.run(Thread.java:748)
Aaron
  • 55,518
  • 11
  • 116
  • 132
  • 1
    please add the stacktrace (or put it into pastebin/github gist and paste a link). Also provide information about versions of databricks runtime, spark cassandra connector, and other things – Alex Ott Oct 27 '22 at 19:10
  • Possible similar question/answer which might help: https://stackoverflow.com/questions/41784791/spark-hashpartitioner-unexpected-partitioning – Aaron Oct 28 '22 at 14:56

0 Answers0