1

I'm having real fun with a local installation of Spark on a Windows 10 machine. I'm trying to read a SAS data file into a Spark dataframe from where I want to write it to a parquet file on an S3 bucket.

The code from my Jupyter Notebook is below.

The error that I get is below the code. It's related to the saurfang package.

The code worked until I added in the SparkContext() stuff (bookended with '<here---' and '>here----' in the code) which I believe I need in order to work with S3 on my local machine (lots of resolved problems got me to this point). This was when I added in the 'df.write.mode...' line too.

I have assumed that create_spark_session() will use the context details built prior to its definition. I'm wondering if some part of the SparkContext() configurations are preventing this line from working correctly:

.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \

I am new to Spark and any help will be appreciated.

import configparser # to work with the configuration file
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import (StructType, StructField, StringType, DoubleType, IntegerType, TimestampType,FloatType)
import pandas as pd
import findspark
import pyspark


config = configparser.ConfigParser()
config.read('dl.cfg') # edit this file to include your own values.

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_DEFAULT_REGION']=config['AWS']['AWS_DEFAULT_REGION']

SAS_DATA_LOCATION=config['OTHER']['SAS_DATA_LOCATION']

BUCKET_NAME = config.get("OTHER","BUCKET_NAME")

#<HERE-------------
# see: https://gist.github.com/asmaier/5768c7cda3620901440a62248614bbd0
sc=pyspark.SparkContext()

# see https://github.com/databricks/spark-redshift/issues/298#issuecomment-271834485
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

# see https://stackoverflow.com/questions/28844631/how-to-set-hadoop-configuration-values-from-pyspark
hadoop_conf=sc._jsc.hadoopConfiguration()

# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

# see http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos/
hadoop_conf.set("fs.s3a.connection.maximum", "100000")

# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + os.environ['AWS_DEFAULT_REGION'] + ".amazonaws.com")

#>HERE-----------

def create_spark_session():
    """Create a spark session in which to work on the data."""
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .config("spark.driver.extraClassPath", "C:/Progra~1/Java/jdk1.8.0_251/postgresql-42.2.14.jar")\
        .getOrCreate()
    return spark

def spark_to_s3(read_format,fpath,tname,delimiter=','):
    """
    Create a Spark dataframe from an input file. 
    
    Args:
    
    read_format: E.g. csv.
    fpath: Full path for your input file, e.g. 'c:\your_file.csv'.
    tname: The name of the file to write 
    delimiter: E.g. ','
    """
    spark = create_spark_session()
    df =spark.read.format(read_format) \
                  .option("header","true") \
                  .option("delimiter",delimiter) \
                  .load(fpath)
    print(df.printSchema())
    df.write.mode("overwrite").parquet('s3a://' + BUCKET_NAME + f'/parquet/{tname}.parquet')

# build a dictionary of arguments for the four input files
parameters_dict = {'immigration': {'read_format':'com.github.saurfang.sas.spark',\
                                   'fpath':SAS_DATA_LOCATION + 'i94_apr16_sub.sas7bdat','delimiter':','}
      }

# iterate through the dictionary, writing each dataframe to a parquet file
for k in parameters_dict.keys():
    spark_to_s3(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])

This is the error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-9aaee5b3167c> in <module>
      9 # iterate through the dictionary, writing each dataframe to a parquet file
     10 for k in parameters_dict.keys():
---> 11     spark_to_s3(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])

<ipython-input-3-04778a385a6a> in spark_to_s3(read_format, fpath, tname, delimiter)
     14                   .option("header","true") \
     15                   .option("delimiter",delimiter) \
---> 16                   .load(fpath)
     17     print(df.printSchema())
     18     df.write.mode("overwrite").parquet('s3a://' + BUCKET_NAME + f'/parquet/{tname}.parquet')

~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in load(self, path, format, schema, **options)
    164         self.options(**options)
    165         if isinstance(path, basestring):
--> 166             return self._df(self._jreader.load(path))
    167         elif path is not None:
    168             if type(path) != list:

~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.github.saurfang.sas.spark. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.github.saurfang.sas.spark.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    ... 13 more
John
  • 33
  • 1
  • 3
  • My advice: bump the Log4J configuration to `DEBUG` level to get a hint of what happens to Spark at init time. If you are lucky, there is a syntax error or a network failure that prevents Spark from pulling its dependency. If you are unlucky, the custom Spark ClassLoader messes with the Scala default ClassLoader which messes with the base ClassLoader of the JVM, and you will need to shoehorn the JAR into a ClassPath somehow... – Samson Scharfrichter Jul 01 '20 at 21:22
  • ...meaning the ClassPath of both the driver and the executors (that's where the file is actually read). – Samson Scharfrichter Jul 01 '20 at 21:25
  • You should pass the package while submitting the job – Shubham Jain Jul 02 '20 at 07:38
  • Thanks for your comments. Samson - I'll revisit this at a later date when I have time - it doesn't seem like an easy fix and I'm under some time pressure at the moment. @ShubhamJain - how can I do that? – John Jul 02 '20 at 10:10
  • submit this job to the cluster using spark-submit --packages package_name script.py – Shubham Jain Jul 02 '20 at 10:20

1 Answers1

0

Download:spark-sas7bdat-2.1.0-s_2.11.jar and keep it in HDFS temp location.Please find below code to help read SAS file from s3 to pyspark data frame.

from pyspark.sql import *
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os 
import boto3
import json
from datetime import date
import time
import sys
from pyspark.sql.functions import col,lit
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

list ={
    'spark.yarn.queue': 'default',
    'spark.jars.packages': 'saurfang:spark-sas7bdat:2.1.0-s_2.11',
    'spark.jars':'hdfs:///tmp/spark-sas7bdat-2.1.0-s_2.11.jar',
    'spark.driver,memory': '2g',
    'spark.executor.memory': '2g',
    'spark.dynamicAllocation.maxExecutors': '10'}
spark = SparkSession.builder.master("local").appName("app_name").enableHiveSupport()
for k,v in list.items():
    spark = spark.config(k, v)
        
spark = spark.getOrCreate()
print(spark.version)

aws_access_key="src_accesskey"
aws_secret_access_key="src_secretkey"
endpoint="s3 endpoint"
bucket="bucketname"

spark._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', aws_access_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', aws_secret_access_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', endpoint)
spark._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enable', 'false')
spark._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')



inputdf=spark.read.format("com.github.saurfang.sas.spark")
             .load("s3a://<bucketname>/<path>.sas7bdat")
inputdf.printSchema()
inputdf.show()