0

I've imported a Json file which has this schema :

sqlContext.read.json("filename").printSchema 
   root 
 |-- COL: long (nullable = true) 
 |-- DATA: array (nullable = true) 
 |    |-- element: struct (containsNull = true) 
 |    |    |-- Crate: string (nullable = true) 
 |    |    |-- MLrate: string (nullable = true) 
 |    |    |-- Nrout: string (nullable = true) 
 |    |    |-- up: string (nullable = true) 
 |-- IFAM: string (nullable = true) 
 |-- KTM: long (nullable = true) 

I'm new on Spark and I want to perform basic statistics like

  • getting the min, max, mean, median and std of numeric variables
  • getting the values frequencies for non-numeric variables.

My questions are :

  • How to change the type of my variables in my schema, from 'string' to 'numeric' ? (Crate, MLrate and Nrout should be numeric variables) ?
  • How to do those basic statistics easily ?
SparkUser
  • 157
  • 1
  • 2
  • 11

1 Answers1

1

How to change the type of my variables in my schema, from 'string' to 'numeric' ? (Crate, MLrate and Nrout should be numeric variables) ?

You can create schema manually and apply it to the existing RDD. I assume your data is stored in a df variable and has the same structure as an example from your previous question:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("COL", LongType, true) ::
    StructField("DATA", ArrayType(StructType(
        StructField("Crate", IntegerType, true) ::
        StructField("MLrate", IntegerType, true) ::
        StructField("Nrout", IntegerType, true) ::
        StructField("up", IntegerType, true) ::
        Nil
    ), true), true) :: 
    StructField("IFAM", StringType, true) :: 
    StructField("KTM", LongType, true) :: 
    Nil
)

def convert(row: Row) = {
    val col = row.get(0)
    val data: Seq[Row] = row.getSeq(1)
    val rowData = data.map(r => Row.fromSeq(r.toSeq.map{
        case v: String => v.toInt
        case _ => null
    })).toList
    val ifam = row.get(2)
    val ktm = row.get(3)
    Row(col, rowData, ifam, ktm)
}

val updatedDf = sqlContext.applySchema(df.rdd.map(convert), schema)
updatedDf.printSchema

and we get expected output:

root
 |-- COL: long (nullable = true)
 |-- DATA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Crate: integer (nullable = true)
 |    |    |-- MLrate: integer (nullable = true)
 |    |    |-- Nrout: integer (nullable = true)
 |    |    |-- up: integer (nullable = true)
 |-- IFAM: string (nullable = true)
 |-- KTM: long (nullable = true)

You can adjust numeric type (DecimalType, DoubleType according to your requirements).

getting the min, max, mean, median and std of numeric variables How to do those basic statistics easily ?

The simplest way to obtain statistics for numeric variables is to use describe method:

updatedDf.describe().show

and you get nicely formatted output:

+-------+----+-------------+
|summary| COL|          KTM|
+-------+----+-------------+
|  count|   2|            2|
|   mean|21.5| 1.4300064E12|
| stddev| 0.5|         null|
|    min|  21|1430006400000|
|    max|  22|1430006400000|
+-------+----+-------------+

If you need an output you can access programmatically you can org.apache.spark.sql.functions import org.apache.spark.sql.functions

import org.apache.spark.sql.functions._

df.agg(
    min("KTM").alias("KTM_min"),
    max("KTM").alias("KTM_max"),
    mean("KTM").alias("KTM_mean")).show

None of the above will work with array field though. To work with these you'll probably need an udf or flatten your structure first.

val flattenedSchema = StructType(
    StructField("COL", LongType, true) ::
    StructField("Crate", IntegerType, true) ::
    StructField("MLrate", IntegerType, true) ::
    StructField("Nrout", IntegerType, true) ::
    StructField("up", IntegerType, true) ::
    StructField("IFAM", StringType, true) :: 
    StructField("KTM", LongType, true) :: 
    Nil
)

def flatten(row: Row) = {
    val col = row.get(0)
    val data: Seq[Row] = row.getSeq(1)
    val ifam = row.get(2)
    val ktm = row.get(3)

    data.map(dat => {
        val crate = dat.get(0)
        val mlrate = dat.get(1)
        val nrout = dat.get(2)
        val up = dat.get(3)
        Row(col, crate, mlrate, nrout, up, ifam, ktm)
    })
}

val updatedFlatDf = sqlContext.
    applySchema(updatedDf.rdd.flatMap(flatten), flattenedSchema)

updatedFlatDf.describe().show

Now you can stats for each field:

+-------+----+------------------+------------------+------------------+----+-------------+
|summary| COL|             Crate|            MLrate|             Nrout|  up|          KTM|
+-------+----+------------------+------------------+------------------+----+-------------+
|  count|  12|                12|                12|                12|   0|           12|
|   mean|21.5|2.1666666666666665|             31.75|2.3333333333333335|null| 1.4300064E12|
| stddev| 0.5|1.2133516482134201|2.5535922410074345| 3.223179934302286|null|         null|
|    min|  21|                 0|                30|                 0|null|1430006400000|
|    max|  22|                 5|                38|                 8|null|1430006400000|
+-------+----+------------------+------------------+------------------+----+-------------+

getting the min, max, mean, median and std of numeric variables

Getting quantiles, including median, is usually far to expensive for large datasets. If you really have to compute median you may my answer for How to find median using Spark useful. It is written in Python but pretty easy to implement in Scala as well. A little bit less comprehensive answer in Scala has been provided by Eugene Zhulenev here.

EDIT:

If you want to convert nrout to date you can replace rowData inside convert with something like this:

val rowData = data.map(dat => {
    val crate = dat.get(0).toInt
    val mlrate = dat.get(1).toInt
    val nrout = java.sql.Timestamp.valueOf(dat.get(2))
    val up = dat.get(3).toInt
    Row(crate, mlrate, nrout, up)
})

and adjust schema:

StructField("Nrout", TimestampType, true)
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • My variable "Nrout" has values like "2012-11-01 20:13:00.0" so I try to put it in StringType in my Schema but I got an error.... Is there a DateTime format? – SparkUser Jul 31 '15 at 14:00
  • Yes, there is `DateType`, but you'll have to convert your field to `java.sql.Date`. – zero323 Jul 31 '15 at 14:06
  • You mean I have to "import java.sql.Date" before ? – SparkUser Jul 31 '15 at 14:11
  • I do not exactly understand what you mean with java.sql.Timestamp.valueOf(_), can you give an example or correct directly in your code? – SparkUser Jul 31 '15 at 15:04