-2

I cannot find exactly what I am looking for, so here it is my question. I fetch from MongoDb some data into a Spark Dataframe. The dataframe has the following schema (df.printSchema):

|-- flight: struct (nullable = true)
|    |-- legs: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- arrival: timestamp (nullable = true)
|    |    |    |-- departure: timestamp (nullable = true)
|    |-- segments: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- arrival: timestamp (nullable = true)
|    |    |    |-- departure: timestamp (nullable = true)

Do note the top-level structure, followed by an array, inside which I need to change my data. For example:

{
  "flight": {
    "legs": [{
        "departure": ISODate("2020-10-30T13:35:00.000Z"),
        "arrival": ISODate("2020-10-30T14:47:00.000Z")
      }
    ],
    "segments": [{
        "departure": ISODate("2020-10-30T13:35:00.000Z"),
        "arrival": ISODate("2020-10-30T14:47:00.000Z")
      }
    ]
  }
}

I want to export this in Json, but for some business reason, I want the arrival dates to have a different format than the departure dates. For example, I may want to export the departure ISODate in ms from epoch, but not the arrival one.

To do so, I thought of applying a custom function to do the transformation:

  // Here I can do any tranformation. I hope to replace the timestamp with the needed value
  val doSomething: UserDefinedFunction = udf(  (value: Seq[Timestamp]) => {
    value.map(x => "doSomething" + x.getTime) }
  )

  val newDf = df.withColumn("flight.legs.departure",
    doSomething(df.col("flight.legs.departure")))

But this simply returns a brand new column, containing an array of a single doSomething string.

{
  "flight": {
    "legs": [{
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z"
      }
    ],
    "segments": [{
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z",
      }
    ]
  },
  "flight.legs.departure": ["doSomething1596268800000"]
}

And newDf.show(1)

+--------------------+---------------------+
|              flight|flight.legs.departure|
+--------------------+---------------------+
|[[[182], 94, [202...| [doSomething15962...|
+--------------------+---------------------+

Instead of

{
  ...
        "arrival": "2020-10-30T14:47:00Z",
        //leg departure date that I changed
        "departure": "doSomething1596268800000"
  ...   // segments not affected in this example
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z",
 ...
}

Any ideas how to proceed?

Edit - clarification: Please bear in mind that my schema is way more complex than what shown above. For example, there is yet another top level data tag, so flight is below along with other information. Then inside flight, legs and segments there are multiple more elements, some that are also nested. I only focused on the ones that I needed to change.

I am saying this, because I would like the simplest solution that would scale. I.e. ideally one that would simply change the required elements without having to de-construct and that re-construct the whole nested structure. If we cannot avoid that, is using case classes the simplest solution?

user1485864
  • 499
  • 6
  • 18

2 Answers2

1

Please check the code below.

Execution Time

With UDF : Time taken: 679 ms

Without UDF : Time taken: 1493 ms

Code With UDF

scala> :paste
// Entering paste mode (ctrl-D to finish)

  // Creating UDF to update value inside array.
  import java.text.SimpleDateFormat
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss") // For me departure values are in string, so using this to convert sql timestmap.
  val doSomething = udf((value: Seq[String]) => {
     value.map(x => s"dosomething${dateFormat.parse(x).getTime}")
  })

// Exiting paste mode, now interpreting.

import java.text.SimpleDateFormat
dateFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@41bd83a
doSomething: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true))))


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {
val updated = df.select("flight.*").withColumn("legs",arrays_zip($"legs.arrival",doSomething($"legs.departure")).cast("array<struct<arrival:string,departure:string>>")).select(struct($"segments",$"legs").as("flight"))
updated.printSchema
updated.show(false)
}

// Exiting paste mode, now interpreting.

root
 |-- flight: struct (nullable = false)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)

+-------------------------------------------------------------------------------------------------+
|flight                                                                                           |
+-------------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, dosomething1604045100000]]]|
+-------------------------------------------------------------------------------------------------+

Time taken: 679 ms

scala>

Code Without UDF

scala> val df = spark.read.json(Seq("""{"flight": {"legs": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}],"segments": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}]}}""").toDS)
df: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<arrival:string,departure:string>>, segments: array<struct<arrival:string,departure:string>>>]

scala> df.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)


scala> df.show(false)
+--------------------------------------------------------------------------------------------+
|flight                                                                                      |
+--------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+--------------------------------------------------------------------------------------------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {
val updated= df
            .select("flight.*")
            .select($"segments",$"legs.arrival",$"legs.departure") // extracting legs struct column values.
            .withColumn("departure",explode($"departure")) // exploding departure column
            .withColumn("departure",concat_ws("-",lit("something"),$"departure".cast("timestamp").cast("long"))) // updating departure column values
            .groupBy($"segments",$"arrival") // grouping columns except legs column
            .agg(collect_list($"departure").as("departure")) // constructing list back
            .select($"segments",arrays_zip($"arrival",$"departure").as("legs")) // construction arrival & departure columns using arrays_zip method.
            .select(struct($"legs",$"segments").as("flight")) // finally creating flight by combining legs & segments columns.

  updated.printSchema
  updated.show(false)
}

// Exiting paste mode, now interpreting.

root
 |-- flight: struct (nullable = false)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)

+---------------------------------------------------------------------------------------------+
|flight                                                                                       |
+---------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, something-1604045100]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+---------------------------------------------------------------------------------------------+

Time taken: 1493 ms

scala>

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Thanks, I will try that. How can get the $ symbol? Because for me I always have to do ``df.col("..")``. – user1485864 May 03 '20 at 11:22
  • 1
    You have to import spark.implicits._ to get that symbol – Srinivas May 03 '20 at 11:32
  • Thanks for taking the time to answer. I have added a clarification concerning the size / complexity of my schema. Do I need to reconstruct my schema in both of your proposals? – user1485864 May 03 '20 at 12:05
  • In both cases you have to reconstruct schema back to original after updating values, If you have complex schema try to use UDF otherwise without UDF. – Srinivas May 03 '20 at 12:10
  • Is it possible to post your complete schema ? – Srinivas May 03 '20 at 12:11
  • If you see above answer I am extracting only required columns other columns are not touching just using names because we need those columns in final DF. – Srinivas May 03 '20 at 12:14
  • I cannot post the whole schema online (intellecutal property of my company). Indeed the second solution seemed way more complex. I can check what I can post tomorrow. – user1485864 May 03 '20 at 12:19
0

Try this

scala> df.show(false)
+----------------------------------------------------------------------------------------------------------------+
|flight                                                                                                          |
+----------------------------------------------------------------------------------------------------------------+
|[[[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]], [[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]]]|
|[[[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]], [[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]]]|
+----------------------------------------------------------------------------------------------------------------+


scala> 

scala> df.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)


scala> 

scala> val myudf = udf(
     |   (arrs:Seq[String]) => {
     |     arrs.map("something" ++ _)
     |   } 
     | )
myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true))))

scala> val df2 = df.select($"flight", myudf($"flight.legs.arr") as "editedArrs")
df2: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<dep:string,arr:string>>, segments: array<struct<dep:string,arr:string>>>, editedArrs: array<string>]

scala> df2.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |-- editedArrs: array (nullable = true)
 |    |-- element: string (containsNull = true)


scala> df2.show(false)
+----------------------------------------------------------------------------------------------------------------+-----------------------------------+
|flight                                                                                                          |editedArrs                         |
+----------------------------------------------------------------------------------------------------------------+-----------------------------------+
|[[[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]], [[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]]]|[something2020-10-30T14:47:00.000Z]|
|[[[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]], [[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]]]|[something2020-10-25T14:37:00.000Z]|
+----------------------------------------------------------------------------------------------------------------+-----------------------------------+


scala> 

scala> 

scala> val df3 = df2.select(struct(arrays_zip($"flight.legs.dep", $"editedArrs") cast "array<struct<dep:string,arr:string>>" as "legs", $"flight.segments") as "flight")
df3: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<dep:string,arr:string>>, segments: array<struct<dep:string,arr:string>>>]

scala> 

scala> df3.printSchema
root
 |-- flight: struct (nullable = false)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)


scala> 

scala> df3.show(false)
+-------------------------------------------------------------------------------------------------------------------------+
|flight                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------+
|[[[2020-10-30T13:35:00.000Z, something2020-10-30T14:47:00.000Z]], [[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]]]|
|[[[2020-10-25T13:15:00.000Z, something2020-10-25T14:37:00.000Z]], [[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]]]|
+-------------------------------------------------------------------------------------------------------------------------+


C.S.Reddy Gadipally
  • 1,718
  • 11
  • 22
  • Thanks for taking the time to answer. If I understand correctly, your solution is applicable even though I may have a way larger schema than the one presentation (see edit in the question). What I mean, is that I don't need to manually reconstruct the whole structure. – user1485864 May 03 '20 at 12:15
  • I don't see why you cannot. – C.S.Reddy Gadipally May 03 '20 at 15:43