-1

I have two pyspark dataframes:

1st dataframe: plants

 +-----+--------+
 |plant|station |
 +-----+--------+
 |Kech |    st1 |
 |Casa |    st2 |
 +-----+--------+

2nd dataframe: stations

 +-------+--------+
 |program|station |
 +-------+--------+
 |pr1    |    null|
 |pr2    |    st1 |
 +-------+--------+

What i want is to replace the null values in the second dataframe stations with all the column station in the first dataframe. Like this :

+-------+--------------+
|program|station       |
+-------+--------------+
|pr1    |    [st1, st2]|
|pr2    |    st1       |
+-------+--------------+

I did this:

stList = plants.select(F.col('station')).rdd.map(lambda x: x[0]).collect()
stations = stations.select(
                    F.col('program')
                    F.when(stations.station.isNull(), stList).otherwise(stations.station).alias('station')
)

but it gives me an error when doesn't accept python list as a parameter

3 Answers3

0

Thanks for your replies.

I've found the solution by converting the column to pandas.

stList = list(plants.select(F.col('station')).toPandas()['station'])

and then use:

F.when(stations.station.isNull(), F.array([F.lit(x) for x in station])).otherwise(stations['station']).alias('station')

it gives directly an array.

-1

quick work around is F.lit(str(stList)) this should work. For better type casting use below mentioned code.

stations = stations.select(
                    F.col('program'),
                    F.when(stations.station.isNull(), func.array([func.lit(x) for x in stList]))
.otherwise(func.array(stations.station)).alias('station')
)

-1

Firstly, you can't keep different datatypes in station column, it needs to be consistent.

+-------+--------------+
|program|station       |
+-------+--------------+
|pr1    |    [st1, st2]| # this is array
|pr2    |    st1       | # this is string
+-------+--------------+

Secondly, this should do the trick:

from pyspark.sql import functions as F

# Create the stList as a string.
stList = ",".join(plants.select(F.col('station')).rdd.map(lambda x: x[0]).collect()) 

# coalesce the variables and then apply pyspark.sql.functions.split function
stations = (stations.select(
    F.col('program'),
    F.split(F.coalesce(stations.station, F.lit(stList)), ",").alias('station')))
stations.show()

Output:

+-------+----------+
|program|   station|
+-------+----------+
|    pr1|[st1, st2]|
|    pr2|     [st1]|
+-------+----------+
Sunny Shukla
  • 342
  • 2
  • 8