1

I currently encounter an error when using an external Python module (orjson) inside foreach function with Pyspark. Everything was fine if I use that module outside foreach function (collect() method). Below is my simple code

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
import orjson
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("HelloSparkSQL") \
        .getOrCreate()

    data = [[1, "male"], [2, "male"], [3, "female"], [4, "female"], [10, "male"], ]
    schema = StructType([StructField("Age", IntegerType()),StructField("Gender", StringType())])
    surveyDF = spark.createDataFrame(data=data, schema= schema)

    countDF = surveyDF.select("Age", "Gender").limit(20)

    list1 = countDF.collect()
    for row in list1:
        data = {
            "age": row["Age"],
            "gender": row["Gender"]
        }
        newjson = orjson.dumps(data)
        print(newjson)

# b'{"age":1,"gender":"male"}'
# b'{"age":2,"gender":"male"}'
# b'{"age":3,"gender":"female"}'
# b'{"age":4,"gender":"female"}'
# b'{"age":10,"gender":"male"}'

But as you know, it's never a good idea to iterate big data after using collect(). So I use a simple foreach function to iterate like below (replace all the parts from list1 to the end):

def jsontest(row):
    data = {
        "age": row["Age"],
        "gender": row["Gender"]
    }
    newjson = orjson.dumps(data)
    print(newjson)

countDF.foreach(jsontest)

Then I got this error

  File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\worker.py", line 668, in main
  File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\worker.py", line 85, in read_command
  File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\cloudpickle\cloudpickle.py", line 679, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'orjson'

I followed some guides on the sof (link), which said I have to add all the dependencies (in my case it's orjson module) to a zip file, then add a --py-file after spark-submit. But it didn't work either. Below is my orjson's module folder: enter image description here

After zipping the folder and doing as the guide said, I encountered another error:

ModuleNotFoundError: No module named 'orjson.orjson' / 'orjson'

I think this method only works if it's a custom py file with a custom function/module. It won't work with the module from "pip install x". I have no luck to open the orjson.cp39-win_amd64.pyd file either

  • i think the packages need to be on all nodes. – samkart Dec 12 '22 at 13:49
  • pip install orjson didn't work ? – Abdennacer Lachiheb Dec 12 '22 at 14:35
  • I've already run pip install orjson. But it only works if I call the module outside the foreach function (the first case when I use collect()). But when I call the orjson library inside foreach function it won't work anymore. I run everything in Pycharm on my local machine. But even at work, I encounter the same problem on server machine with spark-submit, only works if I call an external module outside foreach – Hoang Minh Quang FX15045 Dec 12 '22 at 14:42
  • @samkart could you explain a bit more? Because I think if I call a function after using collect() method, it means that it will run on the driver node (which has the module). While if I call inside foreach method, it will run on the worker node, which somehow doesn't have the module – Hoang Minh Quang FX15045 Dec 12 '22 at 14:46
  • yes, that's absolutely my understanding of your case. when you run a `pip install`, it runs on the driver, not on the workers. – samkart Dec 12 '22 at 14:48

1 Answers1

0

Turn out that in a case like this, I have to ssh into every node and install the python package in each node. My advice is to move to Scala, in scala all the dependency libraries are auto-distributed in all nodes when running spark-submit.