1

I have a spark dataframe like this:

id |            Operation                 |        Value 
----------------------------------------------------------- 
1  | [Date_Min, Date_Max, Device]         | [148590, 148590, iphone]     
2  | [Date_Min, Date_Max, Review]         | [148590, 148590, Good]     
3  | [Date_Min, Date_Max, Review, Device] | [148590, 148590, Bad,samsung]     

The resul that i expect:

id | Operation |  Value |
-------------------------- 
1  | Date_Min  | 148590 |
1  | Date_Max  | 148590 |
1  | Device    | iphone |
2  | Date_Min  | 148590 |
2  | Date_Max  | 148590 |
2  | Review    | Good   |
3  | Date_Min  | 148590 |
3  | Date_Max  | 148590 |
3  | Review    | Bad    |
3  | Review    | samsung|

I'm using Spark 2.1.0 with pyspark. I tried this solution but it worked only for one column.

Thanks

Community
  • 1
  • 1
Omar14
  • 2,007
  • 4
  • 21
  • 34
  • I still cannot figure out the good way to do this particular task. I tried to explode columns separately `df1 = df.select('id', explode(col("Operation")))`, `df2 = df.select('id', explode(col("Value")))`. However, there is no good solution on how to horizontally stack two dataframe together. – titipata Mar 17 '17 at 19:07

2 Answers2

3

Here is an example dataframe from above. I use this solution in order to solve your question.

df = spark.createDataFrame(
     [[1, ['Date_Min', 'Date_Max', 'Device'], ['148590', '148590', 'iphone']], 
      [2, ['Date_Min', 'Date_Max', 'Review'], ['148590', '148590', 'Good']],     
      [3, ['Date_Min', 'Date_Max', 'Review', 'Device'], ['148590', '148590', 'Bad', 'samsung']]], 
     schema=['id', 'l1', 'l2'])

Here, you can define udf to zip two list together for each row first.

from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, explode

zip_list = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      StructField("first", StringType()),
      StructField("second", StringType())
  ]))
)

Finally, you can zip two columns together then explode that column.

df_out = df.withColumn("tmp", zip_list('l1', 'l2')).\
    withColumn("tmp", explode("tmp")).\
    select('id', col('tmp.first').alias('Operation'), col('tmp.second').alias('Value'))
df_out.show()

Output

+---+---------+-------+
| id|Operation|  Value|
+---+---------+-------+
|  1| Date_Min| 148590|
|  1| Date_Max| 148590|
|  1|   Device| iphone|
|  2| Date_Min| 148590|
|  2| Date_Max| 148590|
|  2|   Review|   Good|
|  3| Date_Min| 148590|
|  3| Date_Max| 148590|
|  3|   Review|    Bad|
|  3|   Device|samsung|
+---+---------+-------+
Community
  • 1
  • 1
titipata
  • 5,321
  • 3
  • 35
  • 59
  • No problem @Omar14! – titipata Mar 20 '17 at 14:59
  • Finally, i still have a problem with the function zip_list. When i work with Zeppelin notebook it works but when i tried to automate the job and the script with spark-submit, the job failed with this error: `zip argument #1 must support iteration` – Omar14 May 09 '17 at 13:52
-1

If using DataFrame then try this:-

import pyspark.sql.functions as F

your_df.select("id", F.explode("Operation"), F.explode("Value")).show()
Rakesh Kumar
  • 4,319
  • 2
  • 17
  • 30