0

Suppose I have a pandas dataframe called df

id value1 value2
1 2 1
2 2 1
3 4 5

In plain Python, I wrote a function to process this dataframe and return a dictionary:

d = dict()
for row in df.itertuples()
   x = do_something (row)
   d[x[0]] = x[1:]

I am trying to reimplement this function using Spark.

d = dict() # define a global var
def do_something (id, value1, value2):
   # business logic
   d[x0] = [x1,x2,x3]
   return 0
udf_do = udf (do_something)

then:

df_spark.select (udf_do ('id','value1','value2'))

My idea is, by calling df_spark.select, the function do_something will be called over the dataframe, and it will update the global variable d. I don't really care about the return value of udf_do so I return 0.

My solution does not work, indeed.

Could you suggest me some ways to iterate through (I know it is not a Spark-way) or somehow to process a Spark dataframe and update an external dictionary?

Note that the dataframe is quite large. I tried to convert it to pandas by calling toPandas() but I have OOM problem.

mommomonthewind
  • 4,390
  • 11
  • 46
  • 74
  • Seems you are looking for something similar to this [answer](https://stackoverflow.com/a/45881682/8240561) – Prem Aug 25 '17 at 19:10

1 Answers1

0

UDF cannot update any global state. But, you can do some some businness login inside UDF and then use toLocalIterator to get all the data to the driver in memory-efficient way (partition by partition). For example:

df = spark.createDataFrame([(10, 'b'), (20, 'b'), (30, 'c'), 
                            (40, 'c'), (50, 'c'), (60, 'a')], ['col1', 'col2'])
df.withColumn('udf_result', ......)
df.cache()
df.count() # force cache fill

for row in df.toLocalIterator():
    print(row)
Mariusz
  • 13,481
  • 3
  • 60
  • 64