1

This UDF is written to replace a column's value with a variable. Python 2.7; Spark 2.2.0

import pyspark.sql.functions as func

    def updateCol(col, st):
       return func.expr(col).replace(func.expr(col), func.expr(st))

  updateColUDF = func.udf(updateCol, StringType())

Variable L_1 to L_3 have updated columns for each row . This is how I am calling it:

updatedDF = orig_df.withColumn("L1", updateColUDF("L1", func.format_string(L_1))). \
                withColumn("L2", updateColUDF("L2", func.format_string(L_2))). \
                withColumn("L3", updateColUDF("L3", 
                withColumn("NAME", func.format_string(name)). \
                withColumn("AGE", func.format_string(age)). \
                select("id", "ts", "L1", "L2", "L3",
                     "NAME", "AGE")

The error is:

return Column(sc._jvm.functions.expr(str))
AttributeError: 'NoneType' object has no attribute '_jvm'
earl
  • 738
  • 1
  • 17
  • 38
  • Pyspark does support _lit_ please refer https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.lit – DataWrangler Oct 21 '19 at 09:41
  • Not with Python 2.7 I guess. Gives Cannot reference 'lit' in functions.py – earl Oct 21 '19 at 09:46
  • Can you do `from pyspark.sql.functions import *` this instead of `import pyspark.sql.functions as pyspark_func` – DataWrangler Oct 21 '19 at 09:48
  • Tried. It gives Unresolved reference 'lit' – earl Oct 21 '19 at 09:53
  • What is is that you want to do? Also, `lit` is supported in py2.7. – pissall Oct 21 '19 at 10:38
  • ```python 2.7.15 (default, Dec 12 2018, 18:50:28) [GCC 4.2.1 Compatible Apple LLVM 10.0.0 (clang-1000.10.44.4)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import pyspark >>> from pyspark.sql.functions import lit >>> lit ``` – pissall Oct 21 '19 at 10:40
  • I am trying to replace six colum values with six variables for each record of my source dataframe. Stated in my question – earl Oct 21 '19 at 10:42
  • I can't understand why lit is not getting imported to my project. May be I am using older 2.7 version of py – earl Oct 21 '19 at 10:43
  • 1
    because it's not allowed to use dataframe API function in UDF. fix this first. – jxc Oct 21 '19 at 11:32

2 Answers2

1

Tried to create a sample dataframe and then make use of the lit function in the PySpark.

Seems to work fine, this is using the Databricks notebook

Python2

DataWrangler
  • 1,804
  • 17
  • 32
  • lit is getting picked on pyspark terminal on command prompt. But not on pycharm – earl Oct 22 '19 at 05:37
  • Cool, so thats an existing issue that read in some JIRA tickets, let me see if I can get them for you – DataWrangler Oct 22 '19 at 06:11
  • Just few links that seem to show the similar issue https://stackoverflow.com/questions/11725519/pycharm-shows-unresolved-references-error-for-valid-code https://stackoverflow.com/questions/33700328/pycharm-5-0-1-doesnt-resolve-builtin-modules-methods/37593582#37593582 https://jira.apache.org/jira/browse/SPARK-23878 – DataWrangler Oct 22 '19 at 06:14
  • Yes, I have fixed the lit issue on pycharm atleast. But my main issue of replacing column with corresponding issue is still at large – earl Oct 22 '19 at 06:21
  • just to get some clarity, your scala code that you mentioned using lit now as Lit is available in Pyspark doesnt that solve the issue. Also are you trying to make the string variable "one" to int variable 1 assuming taking the L1 value from the first row...? – DataWrangler Oct 22 '19 at 06:58
  • No. It does not solve the issue. It replaces the same value for all the rows. – earl Oct 22 '19 at 07:00
  • One to 1 is just an example. The API will do any transformation and return result. And i have to replace that transformed value with original one, per record of the main dataframe – earl Oct 22 '19 at 07:01
  • Can you try if this helps you https://pypi.org/project/word2number/ – DataWrangler Oct 22 '19 at 07:13
  • Done using some changes at the source API itself. Thanks for inputs – earl Oct 22 '19 at 09:49
  • @earl you can add the changes done to make it work maybe below your query or post an answer so that others who come can make use of it... :) – DataWrangler Oct 22 '19 at 10:00
  • Actually it was done at the source API itself. No code change was done . – earl Oct 22 '19 at 10:01
1

The error is because you are using pyspark functions inside a udf. It would also be very helpful to know the content of your L1, L2.. variables.

However, if I am understanding what you want to do correctly, you don't need a udf. I am assuming L1, L2 etc are constants, right? If not let me know to adjust the code accordingly. Here's an example:

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F


conf = SparkConf()
spark_session = SparkSession.builder \
    .config(conf=conf) \
    .appName('test') \
    .getOrCreate()

data = [{'L1': "test", 'L2': "data"}, {'L1': "other test", 'L2': "other data"}]
df = spark_session.createDataFrame(data)
df.show()

# +----------+----------+
# |        L1|        L2|
# +----------+----------+
# |      test|      data|
# |other test|other data|
# +----------+----------+

L1 = 'some other data'
updatedDF = df.withColumn(
    "L1",
    F.lit(L1)
)
updatedDF.show()
# +---------------+----------+
# |             L1|        L2|
# +---------------+----------+
# |some other data|      data|
# |some other data|other data|
# +---------------+----------+


# or if you need to replace the value in a more complex way
pattern = '\w+'
updatedDF = updatedDF.withColumn(
    "L1",
    F.regexp_replace(F.col("L1"), pattern, "testing replace")
)

updatedDF.show()
# +--------------------+----------+
# |                  L1|        L2|
# +--------------------+----------+
# |testing replace t...|      data|
# |testing replace t...|other data|
# +--------------------+----------+

# or even something more complicated:
# set L1 value to L2 column when L2 column equals to data, otherwise, just leave L2 as it is
updatedDF = df.withColumn(
    "L2",
    F.when(F.col('L2') == 'data', L1).otherwise(F.col('L2'))
)
updatedDF.show()

# +----------+---------------+
# |        L1|             L2|
# +----------+---------------+
# |      test|some other data|
# |other test|     other data|
# +----------+---------------+

So your example would be:

DF = orig_df.withColumn("L1", pyspark_func.lit(L_1))
...

Also, please make sure you have an active spark session before this point

I hope this helps.

Edit: If L1, L2 etc are lists, then one option is to create a dataframe with them and join to the initial df. We'll need indexes for the join unfortunately and since your dataframe is quite big, I don't think this is a very performant solution. We could also use broadcasts and a udf or broadcasts and join.

Here's a (suboptimal I think) example of how to do the join:

L1 = ['row 1 L1', 'row 2 L1']
L2 = ['row 1 L2', 'row 2 L2']

# create a df with indexes    
to_update_df = spark_session.createDataFrame([{"row_index": i, "L1": row[0], "L2": row[1]} for i, row in enumerate(zip(L1, L2))])

# add indexes to the initial df 
indexed_df = updatedDF.rdd.zipWithIndex().toDF()
indexed_df.show()
# +--------------------+---+
# | _1 | _2 |
# +--------------------+---+
# | [test, some other... | 0 |
# | [other test, othe... | 1 |
# +--------------------+---+

# bring the df back to its initial form
indexed_df = indexed_df.withColumn('row_number', F.col("_2"))\
    .withColumn('L1', F.col("_1").getItem('L1'))\
    .withColumn('L2', F.col("_1").getItem('L2')).\
    select('row_number', 'L1', 'L2')

indexed_df.show()
# +----------+----------+---------------+
# |row_number|        L1|             L2|
# +----------+----------+---------------+
# |         0|      test|some other data|
# |         1|other test|     other data|
# +----------+----------+---------------+

# join with your results and keep the updated columns
final_df = indexed_df.alias('initial_data').join(to_update_df.alias('other_data'), F.col('row_index')==F.col('row_number'), how='left')
final_df = final_df.select('initial_data.row_number', 'other_data.L1', 'other_data.L2')
final_df.show()

# +----------+--------+--------+
# |row_number|      L1|      L2|
# +----------+--------+--------+
# |         0|row 1 L1|row 1 L2|
# |         1|row 2 L1|row 2 L2|
# +----------+--------+--------+

This ^ can definitely be better in terms of performance.

mkaran
  • 2,528
  • 20
  • 23
  • I will try that. L1 to L6 are variables who are set by extracting tags from a SOAP API response XML. And they will keep on changing for every record. This makes my implementation a little tricky and difficult. – earl Oct 21 '19 at 15:10
  • Oh, if they keep changing for every record then it is a list, right? Hopefully with the same size as your DataFrame? – mkaran Oct 21 '19 at 15:14
  • Yes. I am populating these fields per dataframe row as a list – earl Oct 21 '19 at 15:15
  • How big is your dataframe and the lists? (it will determine how we go about this) – mkaran Oct 21 '19 at 15:30
  • Dataframe has 20-25 columns. List has 6 records per dataframe. Number. of records in the dataframe could be large. These 6 columns would replace it's parent 6 columns out of the 20-25 and remaining 14-19 would remain unchanged. I am using foreachPartition to call the API as creating XML payload from 6 columns per dataframe was a tough task – earl Oct 21 '19 at 15:35
  • Thank you for the details. From what I've understood so far, I edited the answer and added an example of how this could be done using joins. It can definitely be a *lot* better. – mkaran Oct 21 '19 at 16:01
  • 1
    I will give this a try and let you know. Cheers – earl Oct 21 '19 at 16:02
  • Just a small point. You are using two lists here. L1 and L2. In my case, there is just one list whose value keeps changing per iteration per API CALL per row of the main dataframe. Does that change approach? Because something similar i had tried and it gives desired result only on the first record because index colum remains 0 per iteration per API call per row and hence join condition satisfies only the first row. Does that change the approach? – earl Oct 21 '19 at 16:12
  • I would need more details to answer that. Could you update your answer with an example for your case? – mkaran Oct 21 '19 at 17:01
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/201220/discussion-between-earl-and-mkaran). – earl Oct 21 '19 at 17:39
  • edited my question. Excuse if any simple typo is made. – earl Oct 21 '19 at 17:40
  • 1
    Done using some changes at the source API itself. Thanks for inputs – earl Oct 22 '19 at 09:48