10

I am trying to read a json file from a google bucket into a pyspark dataframe on a local spark machine. Here's the code:

import pandas as pd
import numpy as np

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

conf = SparkConf().setAll([('spark.executor.memory', '16g'),
                        ('spark.executor.cores','4'),
                         ('spark.cores.max','4')]).setMaster('local[*]')


spark = (SparkSession.
              builder.
              config(conf=conf).
              getOrCreate())


sc = spark.sparkContext

import glob
import bz2
import json
import pickle


bucket_path = "gs://<SOME_PATH>/"
client = storage.Client(project='<SOME_PROJECT>')
bucket = client.get_bucket ('<SOME_PATH>')
blobs = bucket.list_blobs()

theframes = []

for blob in blobs:
    print(blob.name)        
    testspark = spark.read.json(bucket_path + blob.name).cache()
    theframes.append(testspark) 

It's reading files from the bucket fine (I can see the print out from blob.name), but then crashes like this:

 Traceback (most recent call last):
 File "test_code.py", line 66, in <module>
   testspark = spark.read.json(bucket_path + blob.name).cache()
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 274, in json
return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o51.json.
: java.io.IOException: No FileSystem for scheme: gs

I've seen this type of error discussed on stackoverflow, but most solutions seem to be in Scala while I have pyspark, and/or involve messing with core-site.xml, which I've done to no effect.

I am using spark 2.4.1 and python 3.6.7.

Help would be much appreciated!

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
user3490622
  • 939
  • 2
  • 11
  • 30

2 Answers2

14

Some config params are required to recognize "gs" as a distributed filesystem.

Use this setting for google cloud storage connector, gcs-connector-hadoop2-latest.jar

spark = SparkSession \
        .builder \
        .config("spark.jars", "/path/to/gcs-connector-hadoop2-latest.jar") \
        .getOrCreate()

Other configs that can be set from pyspark

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true, 
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "/path/to/keyfile")
# Following are required if you are using oAuth
spark._jsc.hadoopConfiguration().set('fs.gs.auth.client.id', 'YOUR_OAUTH_CLIENT_ID')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.client.secret', 'OAUTH_SECRET')

Alternatively you can set up these configs in core-site.xml or spark-defaults.conf.

Hadoop Configuration on Command Line

You can also use spark.hadoop-prefixed configuration properties to set things up when pyspark (or spark-submit in general), e.g.

--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23
2

Further to Ranga Vure's answer, concretely for spark and hadoop 3 you can do:

spark = SparkSession.builder \
    .appName('spark-run-with-gcp-bucket') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .getOrCreate()

This pulls the required jar file directly from google. You can find other versions on google's hadoop pages here, specifically under "Download the connector". You can copy the links to the connectors and put them inside the config instead of having to download them locally.

I then set another config:

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

and am able to directly pull a CSV file from my GCP bucket with:

df = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("gs://<BUCKET>/<FILE.csv>")

Note: I have already set the following on my machine so authentication with gcp occurs properly:

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "<PATH TO CREDENTIALS WITH PERMISSION TO VIEW BUCKET OBJECT>"
Ocean
  • 21
  • 3