1

Env : Azure Databricks Cluster : 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)

I have pandas_udf, its working for 4 rows, but I tried with more than 4 rows getting below error.

PythonException: 'RuntimeError: The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was 1 and the length of input was 2.'.

Please find below code

data =[{"inputData":"<html>Tanuj is older than Eina. Chetan is older than Tanuj. Eina is older than Chetan. If the first 2 statements are true, the 3rd statement is"},{"inputData":"<html>Pens cost more than pencils. Pens cost less than eraser. Erasers cost more than pencils and pens. If the first two statements are true, the third statement is"},{"inputData":"<html>If we have a tree of n nodes, how many edges will it have?"}, {"inputData":"<div>Which of the following data structures can handle updates and queries in log(n) time on an array?"}]
df = spark.createDataFrame(data)
# removing HTML tags from the input text
@pandas_udf(StringType())
def clean_html(raw_htmls: Iterator[pd.Series]) -> Iterator[pd.Series]:
    pd.set_option('display.max_colwidth', 10000)
    for raw_html in raw_htmls:
        cleanr_regx = re.compile("<.*?>|&([a-z0-9]+|#0-9{1,6}|#x[0-9a-f]{1,6});")
        cleantext = re.sub(cleanr_regx, " ", raw_html.to_string(index=False))
        cleantext = re.sub(" +", " ", cleantext)
        yield pd.Series(cleantext)
df = df.withColumn("Question",clean_html("inputData"))
display(df)

Its working fine. But if I add one more row to data, getting above mentioned error.

data =[{"inputData":"<div>Look at this series: 36, 34, 30, 28, 24, … What number should come next?"},{"inputData":"<html>Tanuj is older than Eina. Chetan is older than Tanuj. Eina is older than Chetan. If the first 2 statements are true, the 3rd statement is"},{"inputData":"<html>Pens cost more than pencils. Pens cost less than eraser. Erasers cost more than pencils and pens. If the first two statements are true, the third statement is"},{"inputData":"<html>If we have a tree of n nodes, how many edges will it have?"}, {"inputData":"<div>Which of the following data structures can handle updates and queries in log(n) time on an array?"}]

In my project am reading data from json file, there is also same issue, if its 1 row its working, but more than 1 am getting same ,

Any one please helps me, am stuck for a week with same error.

Ancil Pa
  • 21
  • 3
  • Why not use `regex_replace` instead? `final = df.select("inputData", regexp_replace("inputData", "<.*?>|&([a-z0-9]+|#0-9{1,6}|#x[0-9a-f]{1,6});", "").alias('Question'))` this code is generating the output exactly as per your requirement – Saideep Arikontham Jan 19 '23 at 08:07
  • Hi @SaideepArikontham, Thank you , its working . But I have same errors generating in other pandas_udf functions. Can you please explain or correct me anything wrong am doing – Ancil Pa Jan 19 '23 at 09:09
  • Hey @Ancilpa, I am not sure why the error occurs. The official documentations also does not have many details about pandas udf https://learn.microsoft.com/en-us/azure/databricks/udf/pandas#:~:text=%23%20%7C%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%209%7C%0A%23%20%2B%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2D%2B-,Iterator%20of%20Series%20to%20Iterator%20of%20Series%20UDF,-An%20iterator%20UDF – Saideep Arikontham Jan 19 '23 at 09:11
  • Hi @SaideepArikontham, Okay thanks for the support. Am doing 60 line of code logic in pndas_udf for a row. Its working for 1 row , but more than one row am getting above error – Ancil Pa Jan 19 '23 at 09:18
  • As the size of data is increasing, the length of the input is changing from 1 to other values but the length of the output is remaining the same i.e., 1. Not sure what the reason is. – Saideep Arikontham Jan 19 '23 at 10:18

1 Answers1

1

You can achieve the same requirement using the following code using regex_replace function.

from pyspark.sql.functions import regexp_replace
final = df.select("inputData", regexp_replace("inputData", "<.*?>|&([a-z0-9]+|#0-9{1,6}|#x[0-9a-f]{1,6});", "").alias('Question'))
display(final)

enter image description here

  • To make the Pandas UDF work, as an alternative, processing each row separately worked for me. Try using the following code (tried for 100 rows):
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


demo = df.withColumn('tp',lit(1))
#demo.show()
demo = demo.withColumn("rn", row_number().over(Window.partitionBy("tp").orderBy("tp")))
#demo.show()
dfs = []
for i in range(1,df.count()+1):
    if(i==1):
        final = demo.filter(demo['rn']==i).withColumn("Question",clean_html(col('inputData')))
    elif(i>1):
        #print(i)
        final = final.union(demo.filter(demo['rn']==i).withColumn("Question",clean_html(col('inputData'))))
final.show(100)

enter image description here

Saideep Arikontham
  • 5,558
  • 2
  • 3
  • 11
  • Thanks @SaideepArikontham , If we use loop, i think its serial processing. In my case i need a parallel processing for reducing time complexity – Ancil Pa Jan 19 '23 at 14:35
  • Hey @AncilPa, it is true. I have given this work around only so that you might be able to use your existing pandas UDF without the above error. Would update the answer if the cause of the error is identified. – Saideep Arikontham Jan 20 '23 at 01:24