when I try to create the following package by sbt package
:
import org.apache.spark.sql.SparkSession
class Log(val cip: String, val scstatus: Int) {
var src: String = cip
var status: Int = scstatus
}
object IISHttpLogs {
def main(args: Array[String]) {
val logFiles = "D:/temp/tests/wwwlogs"
val spark = SparkSession.builder.appName("LogParser").getOrCreate()
val sc = spark.sparkContext;
sc.setLogLevel("ERROR")
val logs = sc.textFile(logFiles)
import spark.implicits._
val rowDF = logs.filter(l => !l.startsWith("#"))
.map(l => l.split(" "))
.map(c => new Log(c(8), c(11).trim.toInt))
.toDF();
println(s"line count: ${rowDF.count()}")
rowDF.createOrReplaceTempView("rows")
val maxHit = spark.sql("SELECT top 1 src, count(*) FROM rows group by src order by count(*) desc")
maxHit.show()
spark.stop()
}
}
I get the following error:
value toDF is not a member of org.apache.spark.rdd.RDD[Log]
I try several things like:
- toDFlog
- create a sql context and import
imlicits._
from this sqlContext
I just can't compile my code.
Any clue welcome to ovverride this error.
I well read Generate a Spark StructType / Schema from a case class and write:
val schema =
StructType(
StructField("src", StringType, false) ::
StructField("status", IntegerType, true) :: Nil)
val rowRDD = logs.filter(l => !l.startsWith("#"))
.map(l => l.split(" "))
.map(c => Row(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(rowRDD, schema);
but doing so I do not use the Log
class. I would like to know if there is a way to get the DataFrame
by using the defined Log
class or if the official/best way is to use the Row
class ?
for example I can't write:
val rowRDD = logs.filter(l => !l.startsWith("#"))
.map(l => l.split(" "))
.map(c => new Log(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(
rowRDD,
ScalaReflection.schemaFor[Log].dataType.asInstanceOf[StructType]);
And I just can't figure out why ?