I'm tryng to flatten a dataframe in Databricks using pyspark. The source table is in cassandra. This is the code I'm using:
df_vehicle_temp = (df_vehicle_temp
.withColumn('device_id_b', df_vehicle_temp.device_id)
.withColumn('timestamp_b', df_vehicle_temp.timestamp)
.withColumn('VEHICLE', df_vehicle_temp.ots)
.withColumn('dwellMilliseconds_vehicle', df_vehicle_temp.dwell_milliseconds)
.drop('ots', 'dwell_milliseconds','device_id', 'timestamp')
)
df_person_temp = cassandra_df.select('device_id', 'timestamp', 'ots', 'dwell_milliseconds', 'views').where(cassandra_df.category=='PERSON')
df_person_temp = (df_person_temp
.withColumn('PERSON', df_person_temp.ots)
.withColumn('dwellMilliseconds_person', df_person_temp.dwell_milliseconds)
.withColumn('views_person', df_person_temp.views)
.drop('ots', 'dwell_milliseconds', 'views')
)
cond = [df_person_temp.device_id == df_vehicle_temp.device_id_b, df_person_temp.timestamp == df_vehicle_temp.timestamp_b]
flettened_df = df_person_temp.join(df_vehicle_temp, cond).select('device_id', 'timestamp','PERSON','views_person','dwellMilliseconds_person','VEHICLE','dwellMilliseconds_vehicle')
If some renaiming of column or drop seems redundant, they probably are. This is the last code I tryed after give up
The code runs well but if I try to display the dataframe I receive the following error:
IllegalStateException: Unexpected partitioning: DataSourcePartitioning
But I don't Understand where I'm wrong.
The full trace of the error is very long, But I can put it if needed.
EDIT: here the additional information requested
- Databricks version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)
- Databricks cluster: Standars_DS3_v2 14Gb 2core
- Cassandra connector: com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
Full Trace of the error:
java.lang.IllegalStateException: Unexpected partitioning: DataSourcePartitioning
at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec(partitioning.scala:245)
at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec$(partitioning.scala:244)
at org.apache.spark.sql.execution.datasources.v2.DataSourcePartitioning.createShuffleSpec(DataSourcePartitioning.scala:27)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5(EnsureRequirements.scala:129)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5$adapted(EnsureRequirements.scala:124)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:124)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:471)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:441)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:629)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:629)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
at org.apache.spark.sql.execution.CollectLimitExec.mapChildren(limit.scala:49)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:602)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:441)
at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:718)
at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:550)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$3(AdaptiveSparkPlanExec.scala:1089)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:1089)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.applyPhysicalRules(AdaptiveSparkPlanExec.scala:1088)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$initialPlan$1(AdaptiveSparkPlanExec.scala:293)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.<init>(AdaptiveSparkPlanExec.scala:292)
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:83)
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:42)
at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$2(QueryExecution.scala:596)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:596)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:595)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:232)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:268)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:228)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:222)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:298)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:361)
at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:325)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3143)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:266)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:100)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:587)
at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:623)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:494)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:678)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:559)
at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:638)
at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:744)
at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:601)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:223)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:559)
at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:210)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$12(DriverLocal.scala:631)
at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:59)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:59)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:608)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:615)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:607)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:526)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:561)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
at java.lang.Thread.run(Thread.java:748)