3

So, the basics are:

  • I'm on Spark 2.+
  • I'm running this all in a Jupyter notebook
  • My goal is to iterate over a number of files in a directory and have spark (1) create dataframes and (2) turn those dataframes into sparkSQL tables. Basically, I want to be able to open the notebook at anytime and have a clean way of always loading everything available to me.

Below are my imports:

from pyspark.sql.functions import *
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

fileDirectory = 'data/'

Below is the actual code:

for fname in os.listdir(fileDirectory):
    sqlContext.read.format("csv").\
            option("header", "true").\
            option("inferSchema", "true").\
            load(fname)

    df_app = app_dat_df
    df_app.createOrReplaceTempView(fname)

But I'm getting the following error message:

AnalysisException: u'Unable to infer schema for CSV. It must be specified manually.;'

It would appear that it's not finding issue with the way I'm passing over the files (great), but it's not letting me infer schemas. When I manually go over each file, this has never been an issue.

Can someone give me some pointers on where I can improve them/get it to run?

Many, many thanks!

bordumb
  • 77
  • 2
  • 3
  • 10
  • Are you sure the path is right ? That you want to access the local file system and that your working directory is `data/`? `fname` is just the name of the file not the full path to it. If the problem comes from a file you should add a print to your loop to see which one is the problem – MaFF Sep 01 '17 at 20:06
  • Good point. I forgot to mention that, but yes, the path and all of that is correct. If I run the following code, file by file, it works fine: `df_name = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true")\ .load("data/file_name.csv")` `df = df_name` `df.createOrReplaceTempView("df_name")` – bordumb Sep 01 '17 at 20:08
  • 1
    so your working directory is not data it's the parent directory of data. In your code you're accessing `transaction_dat.csv` directly. Try `fileDirectory + fname` instead – MaFF Sep 01 '17 at 20:10

1 Answers1

4

Since inferSchema is throwing error you should manually specify the schema of your csv data.

Also as @Marie has mentioned you would need to slightly modify your load syntax.

from pyspark.sql.types import *

customSchema = StructType([
    StructField("string_col", StringType(), True),
    StructField("integer_col", IntegerType(), True),
    StructField("double_col", DoubleType(), True)])

fileDirectory = 'data/'
for fname in os.listdir(fileDirectory):
    df_app = sqlContext.read.format("csv").\
        option("header", "true"). \
        schema(customSchema). \
        load(fileDirectory + fname)

Hope this helps!


Don't forget to let us know if it solved your problem :)

Prem
  • 11,775
  • 1
  • 19
  • 33
  • How can this be replicated if the file directory is a S3 bucket? Will be glad if there is a similar implementation.. – Nayak S May 15 '20 at 10:34
  • 1
    @SubhankarNayak you may want to look here -https://stackoverflow.com/questions/35803027/retrieving-subfolders-names-in-s3-bucket-from-boto3 – Prem May 15 '20 at 11:55