2

I load my CSV using DataFrame then I converted to DataSet but it's shows like this

Multiple markers at this line:
- Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing
spark.implicits._ Support for serializing other types will be added in future releases.
- not enough arguments for method as: (implicit evidence$2:
org.apache.spark.sql.Encoder[DataSet.spark.aacsv])org.apache.spark.sql.Dataset[DataSet.spark.aacsv]. Unspecified value parameter evidence$2

How to resolve this?. My code is -

case class aaCSV(
    a: String, 
    b: String 
    )

object WorkShop {

  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val customSchema = StructType(Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true)))

    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv") 
    df.printSchema()
    df.show()
    val googleDS = df.as[aaCSV]
    googleDS.show()

  }

}

Now I changed main function like this -

def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
   val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
    sa.printSchema()
    sa.show()
}

But it throws error - Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'Adj_Close' given input columns: [_c1, _c2, _c5, _c4, _c6, _c3, _c0]; line 1 pos 7. What should i do ?

Now I execute my method using based on given time interval using spark scheduler. But I refer this link - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. Kindly help us.

2 Answers2

1

Try adding the below import, before you convert DF to DS.

sc.implicits._

OR

sqlContext.implicits._

For more info on working with DataSet https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets

Shankar
  • 8,529
  • 26
  • 90
  • 159
  • Thanks a lot buddy. I tried another approach that is val sa = sqlContext.read.csv("/home/kenla/Spark_Samples/google.csv").as[googleCSV] – Sarathkumar Vulchi Oct 17 '16 at 09:48
  • I tried another approach that is val sa = sqlContext.read.csv("/home/kenla/Spark_Samples/google.csv").as[googleCSV] but throws error "Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`Date`' given input columns: [_c3, _c4, _c0, _c1, _c5, _c6, _c2]; ". Kindly help us. – Sarathkumar Vulchi Oct 17 '16 at 09:51
1

Do you have header (column names) in your csv files ? If yes, try adding .option("header","true") in the read statement. Example: sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV].

The below blog has different examples for Dataframes and Dataset:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html

rishabh.bhardwaj
  • 378
  • 4
  • 12