0

I get a dataframe with schema as below:

root
|-- clip_id: string (nullable = true)
|-- frames: array (nullable = true)
|    |-- element: struct (containsNull = false)
|    |    |-- frame_id: string (nullable = true)
|    |    |-- data_source_info: array (nullable = true)
|    |    |    |-- element: struct (containsNull = false)
|    |    |    |    |-- data_source_path: string (nullable = true)
|    |    |    |    |-- sub_rules: array (nullable = true)
|    |    |    |    |    |-- element: string (containsNull = true)
|    |    |    |    |-- device: string (nullable = true)
|    |    |    |    |-- file_type: string (nullable = true)
|    |    |    |    |-- md5: string (nullable = true)

Here is my code, and my spark version is 3.0.2

data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
.withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
.drop("data_source_path", "sub_rules", "device", "file_type", "md5")
.groupBy("clip_id", "frame_id")
.agg(collect_list("data_source_info").as("data_source_info"))
.withColumn("frames", struct(col("frame_id"),col("data_source_info")))
.sort(col("clip_id").asc,col("frame_id").asc).groupBy(col("clip_id")
.agg(collect_list("frames").asc_null_first.as("frames"))

what I want is to sort the frames by frame_id, but i got error like this:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 4 times, most recent failure: Lost task 0.3 in stage 37.0 (TID 2447, 10.134.64.140, executor 39): java.lang.UnsupportedOperationException: Cannot evaluate expression: input[1, array<struct<frame_id:string,data_source_info:array<struct<data_source_path:string,sub_rules:array<string>,device:string,file_type:string,md5:string>>>>, true] ASC NULLS FIRST
        at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:301)
        at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:300)
        at org.apache.spark.sql.catalyst.expressions.SortOrder.eval(SortOrder.scala:62)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
        at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:76)
        at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:32)
        at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:658)
        at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
        at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1492)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


another way of using udf:

def frameIdSort(frames: WrappedArray[GenericRowWithSchema]): WrappedArray[GenericRowWithSchema] = frames.map(x => (x.getAs[String]("frame_id"), x)).sortBy(_._1).map(_._2)

but also got another error:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not supported

so what i can do to sort the column frames by frame_id?

2 Answers2

1

The error message Cannot evaluate expression: input[1, array... means that you cannot use asc_nulls_first inside agg (or select for that matter). It is an expression describing how a dataframe should be sorted and can only be used inside an orderBy or sort function.

What you seem to want however is not to sort the dataframe but to sort an array column inside the dataframe. For that, you can use array_sort and since you want to sort by frame_id, which is the first element, you don't have to change anything in the rest of the code:

data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
    .withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
    .drop("data_source_path", "sub_rules", "device", "file_type", "md5")
    .groupBy("clip_id", "frame_id")
    .agg(collect_list("data_source_info").as("data_source_info"))
    .withColumn("frames", struct(col("frame_id"),col("data_source_info")))
    // .sort(col("clip_id").asc,col("frame_id").asc)
    .groupBy(col("clip_id")
    .agg(collect_list("frames") as "frames")
    .withColumn("frames", array_sort(col("frames")))

NB: I commented the sort because group by does not maintain order (See does groupBy after orderBy maintain that order?). You may put it back at the end if you like.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • Have you tried? It works for me with exactly the same schema. – Oli Aug 21 '23 at 12:44
  • @Oil, Thank you for your reply. But column frames is not orderable for its complex schema,it will get an error some like "sort_array does not support sorting array of type xxx(a complex schema) which is not orderable " – zcfightings Aug 21 '23 at 13:06
  • yes i tried, my real schema is struct,datasource_size:bigint,source_md5:string,source_path:string,source_region:string,sub_rule_scores:map,sub_rules:array,datasource_timestamp:string,undistorted:boolean,undistortion:string,datasource_updated_at:timestamp,triage_tags:array,annotation_class:string,info:string,issue_type:string>>> is do not work – zcfightings Aug 21 '23 at 13:09
  • it really worked in my example – zcfightings Aug 21 '23 at 13:10
  • I got it. its the column sub_rule_scores who's schema is map that is not orderable – zcfightings Aug 21 '23 at 13:23
  • So you mean that it works with the example you provided but not with your full data? Do I understand correctly? – Oli Aug 21 '23 at 13:24
  • @Oil yes, because the full data schema has a column of map[String, double] type, and this column is not orderable, it works after I cast the map to string。 – zcfightings Aug 28 '23 at 09:34
  • ok, glad you could find a solution :) – Oli Aug 28 '23 at 12:45
0

this usally works for me:

from pyspark.sql import functions as F

df_exploded = df.select("clip_id", F.explode("frames").alias("frame_data"))

df_sorted = df_exploded.orderBy("clip_id", "frame_data.frame_id")

df_final = df_sorted.groupBy("clip_id").agg(F.collect_list("frame_data").alias("frames"))

df_final.show(truncate=False)
Aymen Azoui
  • 369
  • 2
  • 4