13

I need to read some JSON data from a web service thats providing REST interfaces to query the data from my SPARK SQL code for analysis. I am able to read a JSON stored in the blob store and use it.

I was wondering what is the best way to read the data from a REST service and use it like a any other DataFrame.

BTW I am using SPARK 1.6 of Linux cluster on HD insight if that helps. Also would appreciate if someone can share any code snippets for the same as I am still very new to SPARK environment.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Kiran
  • 2,997
  • 6
  • 31
  • 62
  • is it a single JSON blob that you want to parallelize (distribute to nodes) after download, or is it many different JSON strings that you'd rather download directly in the nodes? If a single blob, then is it formatted as mentioned at http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets "Each line must contain a separate, self-contained valid JSON object" – Ashish Awasthi May 10 '16 at 05:53
  • 1
    @Ashish: These are actually multiple files exposed by my webservice somewhere in the order of 8 to 10 and not really large in size. Mostly used as a context to the data stored in the HDFS for my analysis. I did look at the link, but all the examples are looking for the local files, is there a way to read it somewhat like this `val path = "http://www.examples/src/main/resources?type=people"` – Kiran May 11 '16 at 03:45
  • Spark cannot parse an arbitrary json to dataframe, because json is hierarchical structure and dataframe as flat. If your json is not created by spark, chances are that it does not comply to "Each line must contain a separate, self-contained valid JSON object" and hence will need to be parsed using your custom code and then fed to dataframe as collection of case-class objects or spark sql Rows. one of the ways to do parse in scala is shown in http://stackoverflow.com/questions/37003083/spark-parquet-nested-value-flatten/37005148#37005148 – Ashish Awasthi May 11 '16 at 04:16
  • @Ashish I don't think parsing is my issue as I can put the resulting JSON in to the blob store and use it. issue I am facing is to get it directly from the REST endpoint instead of local blob store. – Kiran May 11 '16 at 04:40
  • 1
    you can do it using scalaj.http.Http as shown in the answer. Note that Spark can't do it for you as described above – Ashish Awasthi May 11 '16 at 04:46

3 Answers3

7

On Spark 1.6:

If you are on Python, use the requests library to get the information and then just create an RDD from it. There must be some similar library for Scala (relevant thread). Then just do:

json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}'
rdd = sc.parallelize([json_str])
json_df = sqlContext.jsonRDD(rdd)
json_df

Code for Scala:

val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

This is from: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

Community
  • 1
  • 1
aggFTW
  • 426
  • 3
  • 12
  • @Kiran please accept answers that do answer your question – aggFTW Jun 21 '16 at 02:45
  • You are parallelizing collection !!! and this is an accepted answer. You have everything on your driver node before parallelizing this breaks whole point of distributed computing. – nomadSK25 Aug 07 '18 at 12:22
  • 1
    When doing such parallelize doesn't the data need to fit into the memory of the sc driver node? Meaning if the data is fairly large will parallelize fail? – codeBarer Jan 31 '19 at 03:51
3

Spark cannot parse an arbitrary json to dataframe, because json is hierarchical structure and dataframe as flat. If your json is not created by spark, chances are that it does not comply to condition "Each line must contain a separate, self-contained valid JSON object" and hence will need to be parsed using your custom code and then feed to dataframe as collection of case-class objects or spark sql Rows.

You can download like:

import scalaj.http._
val response = Http("proto:///path/to/json")
  .header("key", "val").method("get")
  .execute().asString.body

and then parse your json as shown in this answer. And then create a Seq of objects of your case-class (say seq) and create a dataframe as

seq.toDF
Community
  • 1
  • 1
Ashish Awasthi
  • 1,302
  • 11
  • 23
1

here you go :- spark 2.2


import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel

object SparkRestApi {
  def main(args: Array[String]): Unit = {

    val logger = Logger.getLogger("blah")
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName("blah")
      .config("spark.sql.warehouse.dir", "C:\\Temp\\hive")
      .master("local[2]")
      //.enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    val url = "https://api.github.com/users/hadley/orgs"
    val result2 = List(scala.io.Source.fromURL(url).mkString)
    val githubRdd2=spark.sparkContext.makeRDD(result2)
    val gitHubDF2=spark.read.json(githubRdd2)
    println(gitHubDF2)
    gitHubDF2.show()

    spark.stop()
  }
}