0
  1. I would like to read a text file directly to dataframe. Not file->rdd->dataframe. Is that possible? I Have read a lot but I cannot make it (read) is not working.

  2. While read it I want to select specific headers from it.

Is there any fast solution to this?

Also what imports should I make?

This is my scala file

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql._

object LoadData {


  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark Job for Loading Data").setMaster("local[*]") // local[*] will access all core of your machine
val sc = new SparkContext(conf) // Create Spark Context
// Load local file data

val rdd = sc.textFile("src/main/resources/data.txt")
val df = rdd.toDF()


// Read the records

println(rdd.foreach(println))
}
}

And my sbt

 name := "HelloScala"

version := "1.0"

scalaVersion := "2.11.12"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core


libraryDependencies ++= Seq(
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11
"org.apache.spark" %% "spark-core" % "2.3.2",
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11
"org.apache.spark" %% "spark-sql" % "2.3.2"
) 

I have the error Error:(16, 18) value toDF is not a member of org.apache.spark.rdd.RDD[String] val df = rdd.toDF()

Thank you very much

giorgionasis
  • 394
  • 1
  • 7
  • 17

3 Answers3

3

Below is example to read a CSV file (with headers) direct to DataFrame

import org.apache.spark.sql.SparkSession

object DataFrameFromCSVFile {

  def main(args:Array[String]):Unit= {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExample")
      .getOrCreate()

    val filePath="src/main/resources/data.txt"

    //Chaining multiple options
    val df2 = spark.read.options(Map("inferSchema"->"true","sep"->",","header"->"true")).csv(filePath)
    df2.show(false)
    df2.printSchema()

  }
}

In case if you have a RDD and want to convert to DataFrame

import org.apache.spark.sql.SparkSession

object DataFrameFromRDD {

  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExample")
      .getOrCreate()

    import spark.sqlContext.implicits._
    val rdd = spark.sparkContext.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000)))

    val df = rdd.toDF()

    //TO-DO use df variable
  }
}

You are getting toDF is not a member of org.apache.spark.rdd.RDD[String] val df = rdd.toDF() because, you don't have implicit imports.

As shown on above example use import spark.sqlContext.implicits._ and try rdd.toDF(

Happy coding!!

Thanks

NNK
  • 1,044
  • 9
  • 24
  • Thank you a lot!First Example worked like a charm!A little help more, I want to load only specif columns of the file. I use df.select("example") but it is not working. My delimiter is | for my file. How Can I user df.select? Thank you a lot – giorgionasis Dec 25 '18 at 18:14
2
val df=spark.read.text("file")

what do you mean by specific headers? is this a csv? if so

val df=spark.read.format("csv").option("header", "true").load("file.csv")

then you can

df.select($"header1",$"header2").show() //etc.
Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68
0

You need to import spark.implicits._ after you set sparkConf(), but you should use SparkSession instead. The following should work

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName(""Spark Job for Loading Data").master("local[*]").getOrCreate()
import spark.implicits._

val df=spark.read.format("csv").option("header", "true").load("data.txt")
mikeL
  • 1,094
  • 2
  • 12
  • 24