4

I am trying to create a data pipeline where I request data from a REST API. The output is a nested json file which is great. I want to read the json file into a pyspark dataframe. This works fine when I save the file locally and use the following code:

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession\
    .builder\
    .appName("jsontest")\
    .getOrCreate()

raw_df = spark.read.json(r"my_json_path", multiLine='true')

But when I want to make a pyspark dataframe directly after I have made the API request I get the following error:

Error when trying to create a pyspark dataframeenter image description here

I use the following code for rest api call and conversion to pyspark dataframe:

apiCallHeaders = {'Authorization': 'Bearer ' + bearer_token}
apiCallResponse = requests.get(data_url, headers=apiCallHeaders, verify=True)
json_rdd = spark.sparkContext.parallelize(apiCallResponse.text)
raw_df = spark.read.json(json_rdd)

The following is some of the response output

{"networks":[{"href":"/v2/networks/velobike-moscow","id":"velobike-moscow","name":"Velobike"},{"href":"/v2/networks/bycyklen","id":"bycyklen","name":"Bycyklen"},{"href":"/v2/networks/nu-connect","id":"nu-connect","name":"Nu-Connect"},{"href":"/v2/networks/baerum-bysykkel","id":"baerum-bysykkel","name":"Bysykkel"},{"href":"/v2/networks/bysykkelen","id":"bysykkelen","name":"Bysykkelen"},{"href":"/v2/networks/onroll-a-rua","id":"onroll-a-rua","name":"Onroll"},{"href":"/v2/networks/onroll-albacete","id":"onroll-albacete","name":"Onroll"},{"href":"/v2/networks/onroll-alhama-de-murcia","id":"onroll-alhama-de-murcia","name":"Onroll"},{"href":"/v2/networks/onroll-almunecar","id":"onroll-almunecar","name":"Onroll"},{"href":"/v2/networks/onroll-antequera","id":"onroll-antequera","name":"Onroll"},{"href":"/v2/networks/onroll-aranda-de-duero","id":"onroll-aranda-de-duero","name":"Onroll"}

I hope my problem make sense and someone can be of help.

Thanks in advance!

Kafels
  • 3,864
  • 1
  • 15
  • 32
  • Update your question with your API response, not from my example, and write as text please – Kafels Jul 07 '21 at 17:54
  • I cannot share the output of my response as it is sensitive data. But I tried with your API and I get the same error.. Could it have something to do with the way I install spark? – Saifullah Babrak Jul 08 '21 at 06:41
  • @Kafels, yeah it might have something to do with the installation. Should I maybe reinstall? – Saifullah Babrak Jul 08 '21 at 12:13
  • Before reinstalling, try [this](https://stackoverflow.com/questions/48260412/environment-variables-pyspark-python-and-pyspark-driver-python/65010346#65010346) – Kafels Jul 08 '21 at 12:16
  • THANK YOU very much, @Kafels!!!! It worked... Have a nice one, mate :) – Saifullah Babrak Jul 08 '21 at 17:56

1 Answers1

2

Following this answer you might add these lines:

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

And to run your code must add [ ] here:

rdd = spark.sparkContext.parallelize([apiCallResponse.text])

See an example:

import requests

response = requests.get('http://api.citybik.es/v2/networks?fields=id,name,href')
rdd = spark.sparkContext.parallelize([response.text])

df = spark.read.json(rdd)

df.printSchema()
# root
#  |-- networks: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- href: string (nullable = true)
#  |    |    |-- id: string (nullable = true)
#  |    |    |-- name: string (nullable = true)

(df
 .selectExpr('inline(networks)')
 .show(n=5, truncate=False))
# +----------------------------+---------------+----------+
# |href                        |id             |name      |
# +----------------------------+---------------+----------+
# |/v2/networks/velobike-moscow|velobike-moscow|Velobike  |
# |/v2/networks/bycyklen       |bycyklen       |Bycyklen  |
# |/v2/networks/nu-connect     |nu-connect     |Nu-Connect|
# |/v2/networks/baerum-bysykkel|baerum-bysykkel|Bysykkel  |
# |/v2/networks/bysykkelen     |bysykkelen     |Bysykkelen|
# +----------------------------+---------------+----------+
Kafels
  • 3,864
  • 1
  • 15
  • 32
  • Hi Kafels. Thank you for your answer! I have actually already tried using the hardbrackets inside parallelize without luck... I get the same error as before. I do not know if it has something to with the way I have installed spark? – Saifullah Babrak Jul 07 '21 at 17:25
  • Could you please update your question showing the output from `apiCallResponse.text`? – Kafels Jul 07 '21 at 17:34
  • I have added the response output in the question. – Saifullah Babrak Jul 07 '21 at 17:43