0

I have a csv file in hdfs location and have converted to a dataframe and my dataframe looks like below ...

column1,column2,column3
Node1,  block1, 1,4,5
Node1,  block1, null
Node1,  block2, 3,6,7
Node1,  block2, null
Node1,  block1, null

I would like to parse this dataframe and my output dataframe should below.

column1,column2,column3
Node1,  block1, counter0:1,counter1:4,counter2:5
Node1,  block1, null
Node1,  block2, counter0:3,counter1:6,counter2:7
Node1,  block2, null
Node1,  block1, null

I am getting some error which is mentioned below so can any please help me how to resolve this error or can help me for correct/modified code? Thank you.

import pyspark
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.types as T
from pyspark.sql.functions import udf

start_value = 2
schema_name = 2
start_key = 0

df = spark.read.csv("hdfs://path/Ccounters/test.csv",header=True)

def dict(x):
    split_col = x.split(",")
    col_nm = df.schema.names[schema_name]
    convert = map(lambda x :col_nm + str(start_key) +":"+str(x) ,split_col)
    con_str = ','.join(convert)
    return con_str
udf_dict = udf(dict, StringType())

df1 =df.withColumn('distance', udf_dict(df.column3))
df1.show()

getting error below:

 File "/opt/data/data11/yarn/local/usercache/cdap/appcache/application_1555606923440_67815/container_e48_1555606923440_67815_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 160, in dump
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o58.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
JPS
  • 57
  • 9
Rajesh Meher
  • 57
  • 1
  • 6

1 Answers1

1

I found that you cannot use spark objects (such as the 'map'-function) inside a UDF, which make sense (https://stackoverflow.com/a/57230637). Alternative way to do the operation that you want, is by using a for-loop in UDF.


1st EDIT

Added a part that can apply this UDF easily to multiple columns, based on the answer of this question: how to get the name of column with maximum value in pyspark dataframe

df = spark.createDataFrame([('Node1', 'block1', '1,4,5', None), ('Node1', 'block1', None, '1,2,3'), ('Node1', 'block2', '3,6,7', None), ('Node1', 'block2', None, '4,5,6'), ('Node1', 'block1', None, '7,8,9')], ['column1', 'column2', 'column3', 'column4'])

#     df.show()
#     +-------+-------+-------+-------+
#     |column1|column2|column3|column4|
#     +-------+-------+-------+-------+
#     |  Node1| block1|  1,4,5|   null|
#     |  Node1| block1|   null|  1,2,3|
#     |  Node1| block2|  3,6,7|   null|
#     |  Node1| block2|   null|  4,5,6|
#     |  Node1| block1|   null|  7,8,9|
#     +-------+-------+-------+-------+

def columnfill(x):
# if x is empty, return x
if x == None: 
    return x
else:
    split = x.split(',')
    y = []
    z = 0
    for i in split:
        y.append('counter'+str(z)+':'+str(i))
        z += 1
    return ','.join(y)

udf_columnfill = udf(columnfill, StringType())

### Apply UDF to a single column:
# df_result1 = df.withColumn('distance', udf_columnfill(df.column3))

### Code for applying UDF to multiple columns

# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ")).drop('"+ str(c) +"')" for c in (columnnames)])

#     # Print condition to see which transformations are executed
#     print(cond)
#     df.withColumn('column3_new', udf_columnfill(df.column3)).drop('column3').withColumn('column4_new', udf_columnfill(df.column4)).drop('column4')   

# Create the new dataframe that evaluates the defined condition
df_result2 = eval(cond)

#     df_result2.show()
#     +-------+-------+--------------------------------+--------------------------------+
#     |column1|column2|column3_new                     |column4_new                     |
#     +-------+-------+--------------------------------+--------------------------------+
#     |Node1  |block1 |counter0:1,counter1:4,counter2:5|null                            |
#     |Node1  |block1 |null                            |counter0:1,counter1:2,counter2:3|
#     |Node1  |block2 |counter0:3,counter1:6,counter2:7|null                            |
#     |Node1  |block2 |null                            |counter0:4,counter1:5,counter2:6|
#     |Node1  |block1 |null                            |counter0:7,counter1:8,counter2:9|
#     +-------+-------+--------------------------------+--------------------------------+   

2nd EDIT

Added an extra UDF input value where the column name is inserted, being the prefix for the column values:

# Updated UDF
def columnfill(cinput, cname):
    # if x is empty, return x
    if cinput == None: 
        return cinput

    else:
        values = cinput.split(',')
        output = []
        count = 0
        for value in values:
            output.append(str(cname)+str(count)+":"+str(value))
            count += 1
        return ','.join(output)

udf_columnfill = udf(columnfill, StringType())

# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond2 = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ", f.lit('" + str(c) + "_new'))).drop('"+ str(c) +"')" for c in (columnnames)])

df_result3 = eval(cond2)
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |column1|column2|column3_new                                 |column4_new                                 |
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |Node1  |block1 |column3_new0:1,column3_new1:4,column3_new2:5|null                                        |
# |Node1  |block1 |null                                        |column4_new0:1,column4_new1:2,column4_new2:3|
# |Node1  |block2 |column3_new0:3,column3_new1:6,column3_new2:7|null                                        |
# |Node1  |block2 |null                                        |column4_new0:4,column4_new1:5,column4_new2:6|
# |Node1  |block1 |null                                        |column4_new0:7,column4_new1:8,column4_new2:9|
# +-------+-------+--------------------------------------------+--------------------------------------------+

print(cond)
# df.withColumn('column3_new', udf_columnfill(df.column3, f.lit('column3_new'))).drop('column3').withColumn('column4_new', udf_columnfill(df.column4, f.lit('column4_new'))).drop('column4')
JPS
  • 57
  • 9
  • Thank you very much. This worked. However this is for just for one col. lets say we have multiple column in df for which we need the conversion then we can use for loop how it will create lots dfs which again need to join? I am trying to avoid join dfs for reshuffle data in df which might lead to incorrect df at the end so wondering any other method? – Rajesh Meher Sep 04 '19 at 19:04
  • Evaluating multiple columns at once using a UDF or joining multiple dataframes is not a desired solution I think. I added some new code to my answer, which applies the UDF for each column that has to be transformed. To keep it a bit clean, it also drops this column after applying, but this part can be skipped if preferred. – JPS Sep 05 '19 at 11:03
  • Yeah, I did the same way what you have mentioned above. Thank you very for your help. – Rajesh Meher Sep 06 '19 at 16:47
  • Great that it worked. Can you accept the answer to show that it provided the solution to your question? – JPS Sep 09 '19 at 08:06
  • how do I change the counter value with actual column name like for column "column3_new" the output for this column should be column3_new0:1,column3_new1:4,counter2:5 so on ... – Rajesh Meher Sep 11 '19 at 07:54
  • ColName contain list of columns ...i used below code but it did not worked. for i in ColName: df_new = newdf.withColumn(i,when((col(i)=='my_s'),i).otherwise(col(i))) ...any pointers df_new.show() – Rajesh Meher Sep 11 '19 at 08:22
  • 1
    @RajeshMeher I think that your question is getting too specific now for general usage in other cases. My advice is to stick to your first question and ask a new question for every addition you have (i.e. 'add column name as prefix for column values'.) In this way the shared knowledge is better applicable for other users too. Nonetheless, I added an edit to the solution which holds an extra UDF input where the column value prefix can be defined. Hopefully it works as intended. – JPS Sep 12 '19 at 12:05