I am reading a csv into dataframe in PySpark using below code snippet.
project structure:
pyspark-debug:
app
__init__.py
data-pipeline.py
main.py
....
data-pipeline.py
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import Row
def row_wise_transformation(row: Row):
return row # need to implement some transformation here.
def test_data_pipeline():
spark_session = (
SparkSession.builder
.master("local[*]")
.appName("pyspark-debug")
.config("spark.driver.maxResultSize", "0")
.config("spark.eventLog.enabled", "false") # make it true when run on prod
.config("spark.rdd.compress", "true")
.config("spark.sql.autoBroadcastJoinThreshold", "4GB")
.config("spark.sql.broadcastTimeout", 15 * 60)
.config("spark.sql.parquet.enableVectorizedReader", "false")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.debug.maxToStringFields", '15000')
.getOrCreate()
)
file_path = "file:///Users/mohdnayyar/Downloads/color_srgb.csv" # change path accordingly
df: DataFrame = spark_session.read.text(file_path)
transform_df = df.rdd.map(lambda row: row_wise_transformation(row)).toDF() # this map api causing error
transform_df.show(truncate=False)
if __name__ == '__main__':
test_data_pipeline()
main.py
from app.data_pipeline import test_data_pipeline
if __name__ == '__main__':
test_data_pipeline()
One can download the sample csv file from here.
when I am running data-pipeline.py directly using spark-submit, its worsking fine. But when I tried to run main.py file using spark-submit getting an serialization error as below
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 587, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
command = serializer._read_with_length(file) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
return pickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'app'
my spark-submit command as follow:
spark-submit \
--verbose \
--name "L1_L2_DATA_PIPELINE" \
--master local \
--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.hadoop:hadoop-aws:3.2.0 \
--conf spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true \
--conf spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true \
--py-files pyspark-debug.zip \
pyspark-debug/main.py
This quesiton is similiar one, two, three, four etc.
Most of the answer is to create a dependency jar and provide it in --py-files as --py-files dependency.zip. But doest not work for me, still getting the same error.