How to change the type of my variables in my schema, from 'string' to 'numeric' ? (Crate, MLrate and Nrout should be numeric variables) ?
You can create schema manually and apply it to the existing RDD. I assume your data is stored in a df
variable and has the same structure as an example from your previous question:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val schema = StructType(
StructField("COL", LongType, true) ::
StructField("DATA", ArrayType(StructType(
StructField("Crate", IntegerType, true) ::
StructField("MLrate", IntegerType, true) ::
StructField("Nrout", IntegerType, true) ::
StructField("up", IntegerType, true) ::
Nil
), true), true) ::
StructField("IFAM", StringType, true) ::
StructField("KTM", LongType, true) ::
Nil
)
def convert(row: Row) = {
val col = row.get(0)
val data: Seq[Row] = row.getSeq(1)
val rowData = data.map(r => Row.fromSeq(r.toSeq.map{
case v: String => v.toInt
case _ => null
})).toList
val ifam = row.get(2)
val ktm = row.get(3)
Row(col, rowData, ifam, ktm)
}
val updatedDf = sqlContext.applySchema(df.rdd.map(convert), schema)
updatedDf.printSchema
and we get expected output:
root
|-- COL: long (nullable = true)
|-- DATA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Crate: integer (nullable = true)
| | |-- MLrate: integer (nullable = true)
| | |-- Nrout: integer (nullable = true)
| | |-- up: integer (nullable = true)
|-- IFAM: string (nullable = true)
|-- KTM: long (nullable = true)
You can adjust numeric type (DecimalType
, DoubleType
according to your requirements).
getting the min, max, mean, median and std of numeric variables
How to do those basic statistics easily ?
The simplest way to obtain statistics for numeric variables is to use describe
method:
updatedDf.describe().show
and you get nicely formatted output:
+-------+----+-------------+
|summary| COL| KTM|
+-------+----+-------------+
| count| 2| 2|
| mean|21.5| 1.4300064E12|
| stddev| 0.5| null|
| min| 21|1430006400000|
| max| 22|1430006400000|
+-------+----+-------------+
If you need an output you can access programmatically you can org.apache.spark.sql.functions
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions._
df.agg(
min("KTM").alias("KTM_min"),
max("KTM").alias("KTM_max"),
mean("KTM").alias("KTM_mean")).show
None of the above will work with array field though. To work with these you'll probably need an udf
or flatten your structure first.
val flattenedSchema = StructType(
StructField("COL", LongType, true) ::
StructField("Crate", IntegerType, true) ::
StructField("MLrate", IntegerType, true) ::
StructField("Nrout", IntegerType, true) ::
StructField("up", IntegerType, true) ::
StructField("IFAM", StringType, true) ::
StructField("KTM", LongType, true) ::
Nil
)
def flatten(row: Row) = {
val col = row.get(0)
val data: Seq[Row] = row.getSeq(1)
val ifam = row.get(2)
val ktm = row.get(3)
data.map(dat => {
val crate = dat.get(0)
val mlrate = dat.get(1)
val nrout = dat.get(2)
val up = dat.get(3)
Row(col, crate, mlrate, nrout, up, ifam, ktm)
})
}
val updatedFlatDf = sqlContext.
applySchema(updatedDf.rdd.flatMap(flatten), flattenedSchema)
updatedFlatDf.describe().show
Now you can stats for each field:
+-------+----+------------------+------------------+------------------+----+-------------+
|summary| COL| Crate| MLrate| Nrout| up| KTM|
+-------+----+------------------+------------------+------------------+----+-------------+
| count| 12| 12| 12| 12| 0| 12|
| mean|21.5|2.1666666666666665| 31.75|2.3333333333333335|null| 1.4300064E12|
| stddev| 0.5|1.2133516482134201|2.5535922410074345| 3.223179934302286|null| null|
| min| 21| 0| 30| 0|null|1430006400000|
| max| 22| 5| 38| 8|null|1430006400000|
+-------+----+------------------+------------------+------------------+----+-------------+
getting the min, max, mean, median and std of numeric variables
Getting quantiles, including median, is usually far to expensive for large datasets. If you really have to compute median you may my answer for How to find median using Spark useful. It is written in Python but pretty easy to implement in Scala as well. A little bit less comprehensive answer in Scala has been provided by Eugene Zhulenev here.
EDIT:
If you want to convert nrout
to date you can replace rowData
inside convert
with something like this:
val rowData = data.map(dat => {
val crate = dat.get(0).toInt
val mlrate = dat.get(1).toInt
val nrout = java.sql.Timestamp.valueOf(dat.get(2))
val up = dat.get(3).toInt
Row(crate, mlrate, nrout, up)
})
and adjust schema:
StructField("Nrout", TimestampType, true)