I currently encounter an error when using an external Python module (orjson) inside foreach function with Pyspark. Everything was fine if I use that module outside foreach function (collect() method). Below is my simple code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
import orjson
if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("HelloSparkSQL") \
.getOrCreate()
data = [[1, "male"], [2, "male"], [3, "female"], [4, "female"], [10, "male"], ]
schema = StructType([StructField("Age", IntegerType()),StructField("Gender", StringType())])
surveyDF = spark.createDataFrame(data=data, schema= schema)
countDF = surveyDF.select("Age", "Gender").limit(20)
list1 = countDF.collect()
for row in list1:
data = {
"age": row["Age"],
"gender": row["Gender"]
}
newjson = orjson.dumps(data)
print(newjson)
# b'{"age":1,"gender":"male"}'
# b'{"age":2,"gender":"male"}'
# b'{"age":3,"gender":"female"}'
# b'{"age":4,"gender":"female"}'
# b'{"age":10,"gender":"male"}'
But as you know, it's never a good idea to iterate big data after using collect(). So I use a simple foreach function to iterate like below (replace all the parts from list1 to the end):
def jsontest(row):
data = {
"age": row["Age"],
"gender": row["Gender"]
}
newjson = orjson.dumps(data)
print(newjson)
countDF.foreach(jsontest)
Then I got this error
File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\worker.py", line 668, in main
File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\worker.py", line 85, in read_command
File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\serializers.py", line 173, in _read_with_length
return self.loads(obj)
File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\serializers.py", line 471, in loads
return cloudpickle.loads(obj, encoding=encoding)
File "C:\SparkEverything\spark3_3_0\python\lib\pyspark.zip\pyspark\cloudpickle\cloudpickle.py", line 679, in subimport
__import__(name)
ModuleNotFoundError: No module named 'orjson'
I followed some guides on the sof (link), which said I have to add all the dependencies (in my case it's orjson module) to a zip file, then add a --py-file after spark-submit. But it didn't work either. Below is my orjson's module folder:
After zipping the folder and doing as the guide said, I encountered another error:
ModuleNotFoundError: No module named 'orjson.orjson' / 'orjson'
I think this method only works if it's a custom py file with a custom function/module. It won't work with the module from "pip install x". I have no luck to open the orjson.cp39-win_amd64.pyd file either