5

world,

How to use Spark-Scala to download a CSV file from the web and load the file into a spark-csv DataFrame?

Currently I depend on curl in a shell command to get my CSV file.

Here is the syntax I want to enhance:

/* fb_csv.scala
This script should load FB prices from Yahoo.

Demo:
spark-shell -i fb_csv.scala
*/

// I should get prices:
import sys.process._
"/usr/bin/curl -o /tmp/fb.csv http://ichart.finance.yahoo.com/table.csv?s=FB"!

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

val fb_df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load("/tmp/fb.csv")

fb_df.head(9)

I want to enhance the above script so it is pure Scala with no shell syntax inside.

user3676943
  • 913
  • 1
  • 13
  • 27
  • I think your best approach is just to read the URI contents outside of the spark code and convert the resulting sequence into a DataFrame. – The Archetypal Paul Sep 25 '16 at 09:36
  • @TheArchetypalPaul I would read it as a stream inside spark. What do you think about that ? – eliasah Sep 25 '16 at 10:37
  • I poked about but couldn't find much detail about that - do you have pointers for how to do this from a URL? Also, these are prices of a single stock, I don't believe the data set will be very large... – The Archetypal Paul Sep 25 '16 at 11:35
  • @TheArchetypalPaul I haven't tried if this solution is actually scalable, but I was thinking about something like this :`sc.parallelize(scala.io.Source.fromURL("http://ichart.finance.yahoo.com/table.csv?s=FB").getLines.toStream)` – eliasah Sep 25 '16 at 15:16
  • `Source.fromFile(...).getLines` is a lazy operation, but then I'm not sure how spark will deal with that. – eliasah Sep 25 '16 at 15:18
  • 1
    Don't lthink a stream adds anything there. I thought you meant spark streaming – The Archetypal Paul Sep 25 '16 at 15:53

2 Answers2

7
val content = scala.io.Source.fromURL("http://ichart.finance.yahoo.com/table.csv?s=FB").mkString

val list = content.split("\n").filter(_ != "")

val rdd = sc.parallelize(list)

val df = rdd.toDF
Samar
  • 2,091
  • 1
  • 15
  • 18
1

Found better answer from Process CSV from REST API into Spark

Here you go:

import scala.io.Source._
import org.apache.spark.sql.{Dataset, SparkSession}

var res = fromURL(url).mkString.stripMargin.lines.toList
val csvData: Dataset[String] = spark.sparkContext.parallelize(res).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.printSchema()
jack
  • 1,787
  • 14
  • 30