I'm having real fun with a local installation of Spark on a Windows 10 machine. I'm trying to read a SAS data file into a Spark dataframe from where I want to write it to a parquet file on an S3 bucket.
The code from my Jupyter Notebook is below.
The error that I get is below the code. It's related to the saurfang package.
The code worked until I added in the SparkContext() stuff (bookended with '<here---' and '>here----' in the code) which I believe I need in order to work with S3 on my local machine (lots of resolved problems got me to this point). This was when I added in the 'df.write.mode...' line too.
I have assumed that create_spark_session() will use the context details built prior to its definition. I'm wondering if some part of the SparkContext() configurations are preventing this line from working correctly:
.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
I am new to Spark and any help will be appreciated.
import configparser # to work with the configuration file
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import (StructType, StructField, StringType, DoubleType, IntegerType, TimestampType,FloatType)
import pandas as pd
import findspark
import pyspark
config = configparser.ConfigParser()
config.read('dl.cfg') # edit this file to include your own values.
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_DEFAULT_REGION']=config['AWS']['AWS_DEFAULT_REGION']
SAS_DATA_LOCATION=config['OTHER']['SAS_DATA_LOCATION']
BUCKET_NAME = config.get("OTHER","BUCKET_NAME")
#<HERE-------------
# see: https://gist.github.com/asmaier/5768c7cda3620901440a62248614bbd0
sc=pyspark.SparkContext()
# see https://github.com/databricks/spark-redshift/issues/298#issuecomment-271834485
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# see https://stackoverflow.com/questions/28844631/how-to-set-hadoop-configuration-values-from-pyspark
hadoop_conf=sc._jsc.hadoopConfiguration()
# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
# see http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos/
hadoop_conf.set("fs.s3a.connection.maximum", "100000")
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + os.environ['AWS_DEFAULT_REGION'] + ".amazonaws.com")
#>HERE-----------
def create_spark_session():
"""Create a spark session in which to work on the data."""
spark = SparkSession \
.builder \
.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.config("spark.driver.extraClassPath", "C:/Progra~1/Java/jdk1.8.0_251/postgresql-42.2.14.jar")\
.getOrCreate()
return spark
def spark_to_s3(read_format,fpath,tname,delimiter=','):
"""
Create a Spark dataframe from an input file.
Args:
read_format: E.g. csv.
fpath: Full path for your input file, e.g. 'c:\your_file.csv'.
tname: The name of the file to write
delimiter: E.g. ','
"""
spark = create_spark_session()
df =spark.read.format(read_format) \
.option("header","true") \
.option("delimiter",delimiter) \
.load(fpath)
print(df.printSchema())
df.write.mode("overwrite").parquet('s3a://' + BUCKET_NAME + f'/parquet/{tname}.parquet')
# build a dictionary of arguments for the four input files
parameters_dict = {'immigration': {'read_format':'com.github.saurfang.sas.spark',\
'fpath':SAS_DATA_LOCATION + 'i94_apr16_sub.sas7bdat','delimiter':','}
}
# iterate through the dictionary, writing each dataframe to a parquet file
for k in parameters_dict.keys():
spark_to_s3(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])
This is the error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-4-9aaee5b3167c> in <module>
9 # iterate through the dictionary, writing each dataframe to a parquet file
10 for k in parameters_dict.keys():
---> 11 spark_to_s3(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])
<ipython-input-3-04778a385a6a> in spark_to_s3(read_format, fpath, tname, delimiter)
14 .option("header","true") \
15 .option("delimiter",delimiter) \
---> 16 .load(fpath)
17 print(df.printSchema())
18 df.write.mode("overwrite").parquet('s3a://' + BUCKET_NAME + f'/parquet/{tname}.parquet')
~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in load(self, path, format, schema, **options)
164 self.options(**options)
165 if isinstance(path, basestring):
--> 166 return self._df(self._jreader.load(path))
167 elif path is not None:
168 if type(path) != list:
~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o37.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.github.saurfang.sas.spark. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.github.saurfang.sas.spark.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 13 more