4

I have some files stored in a google bucket. Those are my settings as suggested here.

spark = SparkSession.builder.\
        master("local[*]").\
        appName("TestApp").\
        config("spark.serializer", KryoSerializer.getName).\
        config("spark.jars", "/usr/local/.sdkman/candidates/spark/2.4.4/jars/gcs-connector-hadoop2-2.1.1.jar").\
        config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName).\
        getOrCreate()
#Recommended settings for using GeoSpark
spark.conf.set("spark.driver.memory", 6)
spark.conf.set("spark.network.timeout", 1000)
#spark.conf.set("spark.driver.maxResultSize", 5)
spark.conf.set

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true, 
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'false')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "myJson.json")



path = 'mBucket-c892b51f8579.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path
client = storage.Client()
name = 'https://console.cloud.google.com/storage/browser/myBucket/'
bucket_id = 'myBucket'
bucket = client.get_bucket(bucket_id)

I can read them simple using the following:

df = pd.read_csv('gs://myBucket/myFile.csv.gz', compression='gzip')
df.head()

    time_zone_name           province_short
0   America/Chicago              US.TX
1   America/Chicago              US.TX
2   America/Los_Angeles          US.CA
3   America/Chicago              US.TX
4   America/Los_Angeles          US.CA

I am trying to read the same file with pyspark

myTable = spark.read.format("csv").schema(schema).load('gs://myBucket/myFile.csv.gz', compression='gzip')

but I get the following error

Py4JJavaError: An error occurred while calling o257.load.
: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
emax
  • 6,965
  • 19
  • 74
  • 141
  • Does this answer your question? [pyspark error: : java.io.IOException: No FileSystem for scheme: gs](https://stackoverflow.com/questions/55595263/pyspark-error-java-io-ioexception-no-filesystem-for-scheme-gs) – user10938362 Mar 23 '20 at 12:50
  • @user10938362 well it is kind of complicated. Do you have suggestions or some line of codes to share? – emax Mar 23 '20 at 13:18
  • For testing, does this work if you have the file locally? It looks like it may be related to getting it from the URL, possibly an authentication issue could occur. Try myFile.csv.gz on your local drive – Jimmy Smith Mar 23 '20 at 21:02
  • @user10938362 the answer you said does not solve the problem – emax Mar 25 '20 at 12:20
  • It looks like you are missing a connector. Try following the steps from [here](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md). Make sure the downloaded `jar` file contains `com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem` class file. – ywbaek Mar 25 '20 at 13:01
  • @oldwooki I will check, I downloaded the `jar` from official sources – emax Mar 25 '20 at 13:03
  • @oldwooki how can I open the `jar` to check it? – emax Mar 25 '20 at 13:06
  • @emax using 7-zip or other similar tools – ywbaek Mar 25 '20 at 13:43

2 Answers2

4

Welcome to the hadoop dependency hell !

1. Use packages rather than jars

Your configuration is basically correct but when you add the gcs-connector as a local jar you also need to manually ensure all its dependencies are available in the JVM classpath.

It's usually easier to add the connector as a package and let spark deal with the dependencies so instead of config("spark.jars", "/usr/local/.sdkman/candidates/spark/2.4.4/jars/gcs-connector-hadoop2-2.1.1.jar") use config('spark.jars.packages', 'com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.1.1')

2. Manage ivy2 dependencies resolution issues

When you do as above, spark will likely complain that it can't download some dependencies due to resolution differences between maven (used for publication) and ivy2 (used by spark for dependency resolution).

You can usually fix this by simply asking spark to ignore the unresolved dependencies using spark.jars.excludes so add a new config line such as config('spark.jars.excludes','androidx.annotation:annotation,org.slf4j:slf4j-api')

3. Manage classpath conflicts

When this is done, the SparkSession will start but the filesystem will still fail because the standard distribution of pyspark packages an old version of guava library that doesn't implement the API the gcs-connector relies on.

You need to ensure that gcs-connector will find its expected version first by using the following configs config('spark.driver.userClassPathFirst','true') and config('spark.executor.userClassPathFirst','true')

4. Manage dependency conflicts

Now you may think everything is OK but actually no because the default pyspark distribution contains version 2.7.3 of hadoop libraries but the gcs-connector version 2.1.1 relies on 2.8+ only APIs.

Now your options are:

  • use a custom build of spark with a newer hadoop (or the package with no built-in hadoop libraries)
  • use an older version of gcs-connector (version 1.9.17 works fine)

5. A working config at last

Assuming you want to stick with the PyPi or Anaconda latest distribution of pyspark, the following config should work as expected.

I've included only the gcs relevant configs, moved the Hadoop config directly into the spark config and assumed you are correctly setting your GOOGLE_APPLICATION_CREDENTIALS:

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
        master("local[*]").\
        appName("TestApp").\
        config('spark.jars.packages', 
               'com.google.cloud.bigdataoss:gcs-connector:hadoop2-1.9.17').\
        config('spark.jars.excludes',
               'javax.jms:jms,com.sun.jdmk:jmxtools,com.sun.jmx:jmxri').\
        config('spark.driver.userClassPathFirst','true').\
        config('spark.executor.userClassPathFirst','true').\
        config('spark.hadoop.fs.gs.impl',
               'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem').\
        config('spark.hadoop.fs.gs.auth.service.account.enable', 'false').\
        getOrCreate()

Note that gcs-connector version 1.9.17 has a different set of excludes than 2.1.1 because why not...

PS: You also need to ensure you're using a Java 1.8 JVM because Spark 2.4 doesn't work on newer JVMs.

rluta
  • 6,717
  • 1
  • 19
  • 21
0

In addition to @rluta's great answer, you can also replace the userClassPathFirst lines by specifically putting the guava jars in extraClassPath:

spark.driver.extraClassPath=/root/.ivy2/jars/com.google.guava_guava-27.0.1-jre.jar:/root/.ivy2/jars/com.google.guava_failureaccess-1.0.1.jar:/root/.ivy2/jars/com.google.guava_listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
spark.executor.extraClassPath=/root/.ivy2/jars/com.google.guava_guava-27.0.1-jre.jar:/root/.ivy2/jars/com.google.guava_failureaccess-1.0.1.jar:/root/.ivy2/jars/com.google.guava_listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar

It's a bit hackish as you need the exact local ivy2 path, although you can also download/copy the jars to somewhere more permanent.

But, this reduces other potential dependency conflicts such as with livy, which throws java.lang.NoClassDefFoundError: org.apache.livy.shaded.json4s.jackson.Json4sModule if gcs-connector's jackson dependencies are in front of the classpath.

Joren Van Severen
  • 2,269
  • 2
  • 24
  • 30