14

I know how to read a csv file into spark using spark-csv (https://github.com/databricks/spark-csv), but I already have the csv file represented as a string and would like to convert this string directly to dataframe. Is this possible?

Gary Sharpe
  • 2,369
  • 8
  • 30
  • 51

3 Answers3

19

Update for Spark 3.x - although actually more for java 17, to make it compatible with new lines() function type nature

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
import scala.collection.JavaConverters._

val csvData: Dataset[String] = ("""
                                  |id, date, timedump
                                  |1, "2014/01/01 23:00:01",1499959917383
                                  |2, "2014/11/31 12:40:32",1198138008843
      """.stripMargin.lines.toList.asScala).toDS()

val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
frame.show()
frame.printSchema()

Update : Starting from Spark 2.2.x there is finally a proper way to do it using Dataset.

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
  """
    |id, date, timedump
    |1, "2014/01/01 23:00:01",1499959917383
    |2, "2014/11/31 12:40:32",1198138008843
  """.stripMargin.lines.toList).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()

Old spark versions

Actually you can, though it's using library internals and not widely advertised. Just create and use your own CsvParser instance. Example that works for me on spark 1.6.0 and spark-csv_2.10-1.4.0 below

    import com.databricks.spark.csv.CsvParser

val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
  .withUseHeader(true)
  .withInferSchema(true)


val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
MxR
  • 586
  • 6
  • 15
4

You can parse your string into a csv using, e.g. scala-csv:

val myCSVdata : Array[List[String]] = myCSVString.split('\n').flatMap(CSVParser.parseLine(_))

Here you can do a bit more processing, data cleaning, verifying that every line parses well and has the same number of fields, etc ...

You can then make this an RDD of records:

val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)

Here you can massage your lists of Strings into a case class, to reflect the fields of your csv data better. You should get some inspiration from the creations of Persons in this example:

https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection

I omit this step.

You can then convert to a DataFrame:

import spark.implicits._ myCSVDataframe = myCSVRDD.toDF()

Francois G
  • 11,957
  • 54
  • 59
3

The accepted answer wasn't working for me in spark 2.2.0 but lead me to what I needed with csvData.lines.toList

val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString

val csvList = streamString.lines.toList

spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvList.toDS())
  .as[SomeCaseClass]  
stsmurf
  • 430
  • 4
  • 8