1

My function get_data returns a tuple: two integer values.

get_data_udf = udf(lambda id: get_data(spark, id), (IntegerType(), IntegerType()))

I need to split them into two columns val1 and val2. How can I do it?

dfnew = df \
    .withColumn("val", get_data_udf(col("id")))

Should I save the tuple in a column, e.g. val, and then split it somehow into two columns. Or is there any shorter way?

Markus
  • 3,562
  • 12
  • 48
  • 85

3 Answers3

1

You can create structFields in udf in order to access later times.

from pyspark.sql.types import *

get_data_udf = udf(lambda id: get_data(spark, id), 
      StructType([StructField('first', IntegerType()), StructField('second', IntegerType())]))
dfnew = df \
    .withColumn("val", get_data_udf(col("id"))) \
    .select('*', 'val.`first`'.alias('first'), 'val.`second`'.alias('second'))
hamza tuna
  • 1,467
  • 1
  • 12
  • 17
0

tuple's can be indexed just like lists, so you can add the value for column one as get_data()[0] and for the second value in the second column you do get_data()[1]

also you can do v1, v2 = get_data() and this way assign the returned tuple values to the variables v1 and v2.

Take a look at this question here for further clarification.

Cut7er
  • 1,209
  • 9
  • 24
  • If I do `withColumn("val1", get_data_udf(col("id"))[0]).withColumn("val2", get_data_udf(col("id"))[1])`, then I will call `get_data_udf` twice. Isnt'it? – Markus Sep 09 '18 at 16:33
  • Also, how can I run `v1, v2 = get_data()`, if I run this function row-wise in the DataFrame? – Markus Sep 09 '18 at 16:34
  • put the first in a loop, and append v1 and v2 to your df row by row - that's how it could work! – Cut7er Sep 09 '18 at 16:35
  • Can you please add some example? But the loops are normally not used in distributed programming. Maybe I misunderstood your idea. Therefore the example will be helpful. – Markus Sep 09 '18 at 16:38
  • @Markus: If you don't want to run the udf twice then you need to save the result in a separate column temporarily. – Shaido Sep 10 '18 at 05:22
0

For example you have a sample dataframe of one column like below

val df = sc.parallelize(Seq(3)).toDF()
df.show()

enter image description here

//Below is a UDF which will return a tuple

def tupleFunction(): (Int,Int) = (1,2)

//we will create two new column from the above UDF

df.withColumn("newCol",typedLit(tupleFunction.toString.replace("(","").replace(")","")
.split(","))).select((0 to 1)
.map(i => col("newCol").getItem(i).alias(s"newColFromTuple$i")):_*).show

enter image description here

Chandan Ray
  • 2,031
  • 1
  • 10
  • 15