Spark Dataframes has a method withColumn
to add one new column at a time. To add multiple columns, a chain of withColumn
s are required. Is this the best practice to do this?
I feel that usingmapPartitions
has more advantages. Let's say I have a chain of three withColumn
s and then one filter to remove Row
s based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
. It also helps if I have a database connection that I would prefer to open once per RDD partition.
My question has two parts.
The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
The second part, what are the tradeoffs in using mapPartitions
over a chain of withColumn
and filter
?
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.