0

when I try to create the following package by sbt package:

import org.apache.spark.sql.SparkSession

class Log(val cip: String, val scstatus: Int) {
    var src: String = cip
    var status: Int = scstatus
}

object IISHttpLogs {
  def main(args: Array[String]) {
    val logFiles = "D:/temp/tests/wwwlogs" 
    val spark = SparkSession.builder.appName("LogParser").getOrCreate()
    val sc = spark.sparkContext;
    sc.setLogLevel("ERROR")

    val logs = sc.textFile(logFiles)        

    import spark.implicits._
    val rowDF = logs.filter(l => !l.startsWith("#"))
        .map(l => l.split(" "))
        .map(c => new Log(c(8), c(11).trim.toInt))
        .toDF();
    println(s"line count: ${rowDF.count()}")        
    rowDF.createOrReplaceTempView("rows")
    val maxHit = spark.sql("SELECT top 1 src, count(*) FROM rows group by src order by count(*) desc")
    maxHit.show()

    spark.stop()
  }
}

I get the following error:

value toDF is not a member of org.apache.spark.rdd.RDD[Log]

I try several things like:

  • toDFlog
  • create a sql context and import imlicits._ from this sqlContext

I just can't compile my code.

Any clue welcome to ovverride this error.


I well read Generate a Spark StructType / Schema from a case class and write:

val schema =
    StructType(
        StructField("src", StringType, false) ::
        StructField("status", IntegerType, true) :: Nil)

val rowRDD = logs.filter(l => !l.startsWith("#"))
    .map(l => l.split(" "))
    .map(c => Row(c(8), c(11).trim.toInt));

val rowDF = spark.sqlContext.createDataFrame(rowRDD, schema); 

but doing so I do not use the Log class. I would like to know if there is a way to get the DataFrame by using the defined Log class or if the official/best way is to use the Row class ?

for example I can't write:

val rowRDD = logs.filter(l => !l.startsWith("#"))
    .map(l => l.split(" "))
    .map(c => new Log(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(
    rowRDD,
    ScalaReflection.schemaFor[Log].dataType.asInstanceOf[StructType]);

And I just can't figure out why ?

tschmit007
  • 7,559
  • 2
  • 35
  • 43
  • 1
    Possible duplicate of [How to convert rdd object to dataframe in spark](https://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – user10938362 May 27 '19 at 15:51

1 Answers1

2

You have to use a case class. At least that worked for me:

case class Log(cip: String,  scstatus: Int)
//...
.map(c =>  Log(c(8), c(11).trim.toInt) // ommit 'new'
.toDF()

I'm not quite sure whether this is a general rule. However in the announcement of the Dataset API, the usage of case classes is mentioned explicitly:

Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans. (https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html)

If you can not use a case class, this answer seems to be appropriate.

moe
  • 1,716
  • 1
  • 14
  • 30