3

I have following data:-

+-------------+
|    card type|
+-------------+
|ColonialVoice|
| SuperiorCard|
|        Vista|
|  Distinguish|
+-------------+

I am having an array of custom order, I want dataset to be ordered as given in an array.

[ "Distinguish", "Vista", "ColonialVoice", "SuperiorCard"]

Expected output as following :-

+-------------+
|    card type|
+-------------+
|  Distinguish|
|        Vista|
|ColonialVoice|
| SuperiorCard|
+-------------+

How to achieve above custom sorting with spark Java API. Can anyone help for above using any API.

abiratsis
  • 7,051
  • 3
  • 28
  • 46
V__
  • 538
  • 10
  • 20

2 Answers2

2

Possible solution, assuming you have a good reason for wanting to do this:

  1. create a DataFrame from that array
  2. add a column of monotonically_increasing_id to that DataFrame
  3. join this DataFrame to your original DataFrame on the card type column
  4. order by the monotonically_increasing_id column
  5. drop the monotonically_increasing_id column

In Scala that would be:

import org.apache.spark.sql.functions.monotonically_increasing_id

val spark = ...
val df = ...
val order = Array("Distinguish", "Vista", "ColonialVoice", "SuperiorCard")
import spark.implicits._

val orderDF = order.toSeq.toDF
val orderDFWithId = orderDF.withColumn("id", monotonically_increasing_id)
val joined = df.join(orderDFWithId, Seq("card type"), "left_outer")
val sortedDF = joined.orderBy("id").drop("id")
Jasper-M
  • 14,966
  • 2
  • 26
  • 37
  • This will affect the performance because, there could be more columns to sort with different sorting options link asc, desc or custom sort as mentioned in question. – V__ May 08 '19 at 12:03
  • Of course it will affect performance. I would say that sorting is not a very good idea anyway in a distributed setting. – Jasper-M May 08 '19 at 12:05
  • Yes, agreed, but requirement of feature on which I am working. Is there any other way to achieve because with partitioning I will face more difficulty with above approach – V__ May 08 '19 at 12:07
  • I think this approach is a good as you're going to get. If you broadcast the small orderDF (spark will probably do this automatically) then it's pretty low cost. You could alternatively write a UDF to generate the sort column. – user2682459 May 08 '19 at 13:19
  • Alternatively you could create the `id` (or whatever you want to call it) column before converting to a `DataFrame` (`zipWithIndex` comes to mind), but I don't know if that would really make a difference. You'll always have to do some sort of joining. Either explicitly like here, or by using a `udf` like in the other answer. You'd have to benchmark to know which way is more efficient (if any). – Jasper-M May 13 '19 at 09:38
0

Here is another approach which extracts the desired array index from the card type and then assign it into a new column. We can achieve that by utilizing Spark functions array and array_position introduced in Spark 2.4:

import org.apache.spark.sql.functions.{array_position, array, udf, lit}
val cardTypes = Seq("Distinguish", "Vista", "ColonialVoice", "SuperiorCard")

val df = Seq(
("ColonialVoice"),
("SuperiorCard"),
("Vista"),
("Distinguish"))
.toDF("card_type")

df.withColumn("card_indx", 
              array_position(array(cardTypes.map(t => lit(t)):_*), $"card_type"))
              .orderBy("card_indx")
              .drop("card_indx")
              .show

// +-------------+
// |    card_type|
// +-------------+
// |  Distinguish|
// |        Vista|
// |ColonialVoice|
// | SuperiorCard|
// +-------------+

First we create an array from content of cardType Seq with array(cardTypes.map(t => lit(t)):_*) then extract and assign the index of the current card_type into a new column card_indx. Finally we order by card_indx.

For Spark < 2.4.0 array_position is not available and you can use an udf:

val getTypesIndx = udf((types: Seq[String], cardt: String) => cardTypes.indexOf(cardt))

df.withColumn("card_indx", getTypesIndx(array(cardTypes.map(t => lit(t)):_*), $"card_type"))
              .orderBy("card_indx")
              .drop("card_indx")
              .show
abiratsis
  • 7,051
  • 3
  • 28
  • 46