I found that you cannot use spark objects (such as the 'map'-function) inside a UDF, which make sense (https://stackoverflow.com/a/57230637). Alternative way to do the operation that you want, is by using a for-loop in UDF.
1st EDIT
Added a part that can apply this UDF easily to multiple columns, based on the answer of this question: how to get the name of column with maximum value in pyspark dataframe
df = spark.createDataFrame([('Node1', 'block1', '1,4,5', None), ('Node1', 'block1', None, '1,2,3'), ('Node1', 'block2', '3,6,7', None), ('Node1', 'block2', None, '4,5,6'), ('Node1', 'block1', None, '7,8,9')], ['column1', 'column2', 'column3', 'column4'])
# df.show()
# +-------+-------+-------+-------+
# |column1|column2|column3|column4|
# +-------+-------+-------+-------+
# | Node1| block1| 1,4,5| null|
# | Node1| block1| null| 1,2,3|
# | Node1| block2| 3,6,7| null|
# | Node1| block2| null| 4,5,6|
# | Node1| block1| null| 7,8,9|
# +-------+-------+-------+-------+
def columnfill(x):
# if x is empty, return x
if x == None:
return x
else:
split = x.split(',')
y = []
z = 0
for i in split:
y.append('counter'+str(z)+':'+str(i))
z += 1
return ','.join(y)
udf_columnfill = udf(columnfill, StringType())
### Apply UDF to a single column:
# df_result1 = df.withColumn('distance', udf_columnfill(df.column3))
### Code for applying UDF to multiple columns
# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ")).drop('"+ str(c) +"')" for c in (columnnames)])
# # Print condition to see which transformations are executed
# print(cond)
# df.withColumn('column3_new', udf_columnfill(df.column3)).drop('column3').withColumn('column4_new', udf_columnfill(df.column4)).drop('column4')
# Create the new dataframe that evaluates the defined condition
df_result2 = eval(cond)
# df_result2.show()
# +-------+-------+--------------------------------+--------------------------------+
# |column1|column2|column3_new |column4_new |
# +-------+-------+--------------------------------+--------------------------------+
# |Node1 |block1 |counter0:1,counter1:4,counter2:5|null |
# |Node1 |block1 |null |counter0:1,counter1:2,counter2:3|
# |Node1 |block2 |counter0:3,counter1:6,counter2:7|null |
# |Node1 |block2 |null |counter0:4,counter1:5,counter2:6|
# |Node1 |block1 |null |counter0:7,counter1:8,counter2:9|
# +-------+-------+--------------------------------+--------------------------------+
2nd EDIT
Added an extra UDF input value where the column name is inserted, being the prefix for the column values:
# Updated UDF
def columnfill(cinput, cname):
# if x is empty, return x
if cinput == None:
return cinput
else:
values = cinput.split(',')
output = []
count = 0
for value in values:
output.append(str(cname)+str(count)+":"+str(value))
count += 1
return ','.join(output)
udf_columnfill = udf(columnfill, StringType())
# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond2 = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ", f.lit('" + str(c) + "_new'))).drop('"+ str(c) +"')" for c in (columnnames)])
df_result3 = eval(cond2)
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |column1|column2|column3_new |column4_new |
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |Node1 |block1 |column3_new0:1,column3_new1:4,column3_new2:5|null |
# |Node1 |block1 |null |column4_new0:1,column4_new1:2,column4_new2:3|
# |Node1 |block2 |column3_new0:3,column3_new1:6,column3_new2:7|null |
# |Node1 |block2 |null |column4_new0:4,column4_new1:5,column4_new2:6|
# |Node1 |block1 |null |column4_new0:7,column4_new1:8,column4_new2:9|
# +-------+-------+--------------------------------------------+--------------------------------------------+
print(cond)
# df.withColumn('column3_new', udf_columnfill(df.column3, f.lit('column3_new'))).drop('column3').withColumn('column4_new', udf_columnfill(df.column4, f.lit('column4_new'))).drop('column4')