0

I have a web service built around Spark that, based on a JSON request, builds a series of dataframe/dataset operations.

These operations involve multiple joins, filters, etc. that would change the ordering of the values in the columns. This final data set could have rows to the scale of millions.

Preferably without converting it to an RDD, is there anyway to apply a custom sort(s) on some columns of the final dataset based on the order of elements passed in as Lists?

The original dataframe is of the form

+----------+----------+
| Column 1 | Column 2 |
+----------+----------+
| Val 1    | val a    |
+----------+----------+
| Val 2    | val b    |
+----------+----------+
| val 3    | val c    |
+----------+----------+

After a series of transformations are performed, the dataframe ends up looking like this.

+----------+----------+----------+----------+
| Column 1 | Column 2 | Column 3 | Column 4 |
+----------+----------+----------+----------+
| Val 2    | val b    | val 999  | val 900  |
+----------+----------+----------+----------+
| Val 1    | val c    | val 100  | val 9$#@ |
+----------+----------+----------+----------+
| val 3    | val a    | val 2##  | val $#@8 |
+----------+----------+----------+----------+

I now need to apply a sort on multiple columns based on the order of the values passed as an Array list.

For example:
Col1values Order=[val 1,val 3,val 2}
Col3values Order=[100,2##,999].

ZygD
  • 22,092
  • 39
  • 79
  • 102
vva
  • 133
  • 4
  • 11
  • 1
    I suppose that by custom sort order, you're looking to pass a sort of function to `sort()`. But isn't it possible, or even easier, to compute that sort key and add it as a column by which you'd then sort? – ernest_k Jan 28 '20 at 05:15
  • please provide some code what you have done so far and also provide an example of IP data and expected OP. because seem to be not clear about what you want to achieve. – kavetiraviteja Jan 28 '20 at 07:29
  • I thing this thread will help you. https://stackoverflow.com/questions/55128213/how-to-sort-dataframe-with-my-comparator-using-scala – kavetiraviteja Jan 28 '20 at 07:35
  • Could you provide more detail on that "custom sort"? – Oli Jan 28 '20 at 11:46
  • I have edited the question to add more details. – vva Jan 28 '20 at 16:31

1 Answers1

0

Custom sorting works by creating a column for sorting. It does not need to be a visible column inside the dataframe. I can show it using PySpark.

Initial df:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'a', 'A'),
     (2, 'a', 'B'),
     (3, 'a', 'C'),
     (4, 'b', 'A'),
     (5, 'b', 'B'),
     (6, 'b', 'C'),
     (7, 'c', 'A'),
     (8, 'c', 'B'),
     (9, 'c', 'C')],
    ['id', 'c1', 'c2']
)

Custom sorting on 1 column:

from itertools import chain

order = {'b': 1, 'a': 2, 'c': 3}

sort_col = F.create_map([F.lit(x) for x in chain(*order.items())])[F.col('c1')]
df = df.sort(sort_col)

df.show()
# +---+---+---+
# | id| c1| c2|
# +---+---+---+
# |  5|  b|  B|
# |  6|  b|  C|
# |  4|  b|  A|
# |  1|  a|  A|
# |  2|  a|  B|
# |  3|  a|  C|
# |  7|  c|  A|
# |  8|  c|  B|
# |  9|  c|  C|
# +---+---+---+

On 2 columns:

from itertools import chain

order1 = {'b': 1, 'a': 2, 'c': 3}
order2 = {'B': 1, 'C': 2, 'A': 3}

sort_col1 = F.create_map([F.lit(x) for x in chain(*order1.items())])[F.col('c1')]
sort_col2 = F.create_map([F.lit(x) for x in chain(*order2.items())])[F.col('c2')]
df = df.sort(sort_col1, sort_col2)

df.show()
# +---+---+---+
# | id| c1| c2|
# +---+---+---+
# |  5|  b|  B|
# |  6|  b|  C|
# |  4|  b|  A|
# |  2|  a|  B|
# |  3|  a|  C|
# |  1|  a|  A|
# |  8|  c|  B|
# |  9|  c|  C|
# |  7|  c|  A|
# +---+---+---+

Or as a function:

from itertools import chain
def cust_sort(col: str, order: dict):
    return F.create_map([F.lit(x) for x in chain(*order.items())])[F.col(col)]

df = df.sort(
    cust_sort('c1', {'b': 1, 'a': 2, 'c': 3}),
    cust_sort('c2', {'B': 1, 'C': 2, 'A': 3})
)
ZygD
  • 22,092
  • 39
  • 79
  • 102