-1

A have some data that look like this

from local_spark import sc,sqlContext
rdd = sc.parallelize([
                        ("key1", 'starttime=10/01/2015', 'param1', '1,2,3,99,88'), 
                        ("key2", 'starttime=10/02/2015'', 'param1', '11,12'), 
                        ("key1", 'starttime=10/01/2015'', 'param2', '66,77')
                    ])

The third parameter is a comma-separated (one value per second) list of values that can be very huge

What I need to do is to group the dataset by key and then flapMap it. The expected result would be something like this:

(key1)     # rdd key

# for each key, a table with the values
key   timestamp     param1   param2
key1   10/01/2015    1         66     
key1   10/01/2015    2         77
key1   10/01/2015    3         null
key1   10/01/2015    99        null


(key2)    # rdd key
key   timestamp     param1   param2
key2   10/01/2015    11       null     
key2   10/01/2015    12       null

So far, what I have tried to do is something like this: rdd.groupByKey().flatMap(functionToParseValuesAndTimeStamps)

If I do something like this, would the results of the flatMap operation be still grouped by the key? Would I "loose the group by" operation?

obs: a more naive approach would be flapMap first, and then group by key. But since there is much less key values than values, I think this would result in poor performance

guilhermecgs
  • 2,913
  • 11
  • 39
  • 69
  • 1
    what's the relation between the source data and the result? How this record : `"key1", 'starttime=10/01/2015', 'param1', '1,2,3,99,88'` turns into `key1 10/01/2015 1 66`? – maasg Nov 09 '16 at 18:04

2 Answers2

0

I would suggest converting to a dataframe and write a UDF to split the columns and after some wrangling you can follow this code I took and edited according to your data. Copy and Paste in Spark-Shell to play with it.

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._
import org.apache.spark.sql.functions.udf

val df2 = Seq( ("key1", "11", "12"),
               ("key2", "66", "77")).toDF("keys", "num1", "num2")


def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
                 val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
                 require(types.distinct.size == 1)

                 val kvs = explode(array(
                   cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
                 ))

                 val byExprs = by.map(col(_))

                 df2
                   .select(byExprs :+ kvs.alias("_kvs"): _*)
                   .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
               }

val x = toLong(df2, Seq("keys"))

heres how it looks
enter image description here

Community
  • 1
  • 1
xem
  • 125
  • 5
  • 17
0

flatMap does not keep the groups...

So, if you flapMap something after a groupBy() operation, the rdd records will NOT be grouped

guilhermecgs
  • 2,913
  • 11
  • 39
  • 69