I get a dataframe with schema as below:
root
|-- clip_id: string (nullable = true)
|-- frames: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- frame_id: string (nullable = true)
| | |-- data_source_info: array (nullable = true)
| | | |-- element: struct (containsNull = false)
| | | | |-- data_source_path: string (nullable = true)
| | | | |-- sub_rules: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- device: string (nullable = true)
| | | | |-- file_type: string (nullable = true)
| | | | |-- md5: string (nullable = true)
Here is my code, and my spark version is 3.0.2
data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
.withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
.drop("data_source_path", "sub_rules", "device", "file_type", "md5")
.groupBy("clip_id", "frame_id")
.agg(collect_list("data_source_info").as("data_source_info"))
.withColumn("frames", struct(col("frame_id"),col("data_source_info")))
.sort(col("clip_id").asc,col("frame_id").asc).groupBy(col("clip_id")
.agg(collect_list("frames").asc_null_first.as("frames"))
what I want is to sort the frames by frame_id, but i got error like this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 4 times, most recent failure: Lost task 0.3 in stage 37.0 (TID 2447, 10.134.64.140, executor 39): java.lang.UnsupportedOperationException: Cannot evaluate expression: input[1, array<struct<frame_id:string,data_source_info:array<struct<data_source_path:string,sub_rules:array<string>,device:string,file_type:string,md5:string>>>>, true] ASC NULLS FIRST
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:301)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:300)
at org.apache.spark.sql.catalyst.expressions.SortOrder.eval(SortOrder.scala:62)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:76)
at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:32)
at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:658)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1492)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
another way of using udf:
def frameIdSort(frames: WrappedArray[GenericRowWithSchema]): WrappedArray[GenericRowWithSchema] = frames.map(x => (x.getAs[String]("frame_id"), x)).sortBy(_._1).map(_._2)
but also got another error:
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not supported
so what i can do to sort the column frames by frame_id?