0

My question is kinda similar to this one, but the suggested solutions did not solve my problem.

I have a very simple spark job that I want to run locally, basically, it just reads a file from S3 and creates a data frame out of it.

When I run my code on Amazon EC2 cluster everything works fine, but when I want to run it locally I get this error

Caught exception while loading path, returning empty data frame: No FileSystem for scheme: s3a

Here are the differences when I run the code on EC2 cluster and locally :

When I run it locally I commented all the provided tag for spark dependancy.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
        <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
        <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
        <!--<scope>provided</scope>-->
    </dependency>

Also, I added this to the dependency

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-aws</artifactId>
  <version>3.0.0</version>
</dependency>

and this the way I create spark session locally and on the cluster:

On the cluster:

  def getSparkSession(config: BaseConfig): SparkSession = {

    val conf = new SparkConf()

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryoserializer.buffer.mb","24")

    config.getSparkConfig.foreach(x => conf.set(x._1, x._2))

    val ss = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()


    ss.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", config.awsAccessKeyId)
    ss.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", config.awsSecretAccessKey)

    ss
  }

When I run it locally:

  def getLocalSparkSession(config: BaseConfig): SparkSession = {
    val warehouseLocation = new File("spark-warehouse").getAbsolutePath

    val ss = SparkSession.builder()
      .appName(this.getClass.getName)
      .master("local")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    ss.sparkContext.setLogLevel("WARN")


    ss.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", config.awsAccessKeyId)
    ss.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", config.awsSecretAccessKey)

    ss
  }

P.S. My spark-shell --version shows it uses spark version 2.2.1, but I think my spark on EC2 cluster is older version (should be 2.0 something)

Am1rr3zA
  • 7,115
  • 18
  • 83
  • 125

1 Answers1

0

Don't mix hadoop-aws versions with those Spark was built with, it not work. You are probably just seeing one symptom of this (in hadoop 2.7 the filesystem was self-regitering, but in hadoop-2.8+ it is explicitly registered in hadoop-common's core-default.xml file.

Ideally you should just be able to ask for the spark-hadoop-cloud module to go with the release, but I don't see it in the public repos.

Until then, work out the hadoop version, add the hadoop-aws module for that release to your build, and use the fs.s3a settings (including keys).

stevel
  • 12,567
  • 1
  • 39
  • 50