2

I have the following PySpark Input Dataframe:

+-------+------------+
| index | valuelist  |
+-------+------------+
| 1.0   | [10,20,30] |
| 2.0   | [11,21,31] |
| 0.0   | [14,12,15] |
+-------+------------+

Where:

  • Index: type Double
  • Valuelist: type Vector. (it's NOT Array)

From the above Input Dataframe, I want to get the following Output Dataframe in PySpark

+-------+-------+
| index | value |
+-------+-------+
| 1.0   | 20    |
| 2.0   | 31    |
| 0.0   | 14    |
+-------+-------+

Logic:

for each row:
  value = valuelist[index] 
msank
  • 21
  • 1
  • 4

2 Answers2

1

Spark version 1.5 and higher

You can use pyspark.sql.functions.expr to pass a column value as an input to a function:

df.select("index", f.expr("valuelist[CAST(index AS integer)]").alias("value")).show()
#+-----+-----+
#|index|value|
#+-----+-----+
#|  1.0|   20|
#|  2.0|   31|
#|  0.0|   14|
#+-----+-----+

Spark version 2.1 and higher

If you have spark version 2.1 or higher, here's an alternative using pyspark.sql.functions.posexplode:

import pyspark.sql.functions as f

df.select("index", f.posexplode("valuelist").alias("pos", "value"))\
    .where(f.col("index").cast("int") == f.col("pos"))\
    .select("index", "value")\
    .show()
#+-----+-----+
#|index|value|
#+-----+-----+
#|  1.0|   20|
#|  2.0|   31|
#|  0.0|   14|
#+-----+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
0

You can create a new column and pass these two columns as an input.

from pyspark.sql import functions as F
columns = ['index', 'valuelist']
vals = [
     (0, [1,2]),
     (1, [1,2])
]

df = sqlContext.createDataFrame(vals, columns)
df = df.withColumn(
"value", udf(lambda index_and_list: index_and_list[0][index_and_list[1]], IntegerType())(
    F.struct(F.col("valuelist"), F.col("index")))
    )

Got the following output:

> +-----+---------+-----+
|index|valuelist|value|
+-----+---------+-----+
|    0|   [1, 2]|    1|
|    1|   [1, 2]|    2|
+-----+---------+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
Nanda
  • 1,038
  • 2
  • 16
  • 33
  • 1
    This will cause an error in the OP's code because their `index` column is not an integer, however that's easy to fix by adding a `cast`. Regardless, I think your `udf` is overly complicated/hard to understand- if you have to use one here, I'd go with something like `df = df.withColumn("value", udf(lambda valuelist, index: valuelist[index], IntegerType())(F.col("valuelist"), F.col("index").cast("int")))` instead – pault Aug 03 '18 at 19:51
  • giving error `name 'udf' is not defined`. Any idea? – msank Aug 04 '18 at 04:53
  • `from pyspark.sql.functions import udf` – pault Aug 04 '18 at 05:30