10

I have a very big table of time series data that have these columns:

  • Timestamp
  • LicensePlate
  • UberRide#
  • Speed

Each collection of LicensePlate/UberRide data should be processed considering the whole set of data. In others words, I do not need to proccess the data row by row, but all rows grouped by (LicensePlate/UberRide) together.

I am planning to use spark with dataframe api, but I am confused on how can I perform a custom calculation over spark grouped dataframe.

What I need to do is:

  1. Get all data
  2. Group by some columns
  3. Foreach spark dataframe group apply a f(x). Return a custom object foreach group
  4. Get the results by applying g(x) and returning a single custom object

How can I do steps 3 and 4? Any hints over which spark API (dataframe, dataset, rdd, maybe pandas...) should I use?

The whole workflow can be seen below:

Workflow

guilhermecgs
  • 2,913
  • 11
  • 39
  • 69
  • 1
    Pandas are not part of the Spark, you can use `DataFrame` but you'll [have to do it in Scala](http://stackoverflow.com/a/32101530/1560062) and [add Python wrapper](http://stackoverflow.com/a/33257733/1560062), RDDs should work just fine. – zero323 Sep 20 '16 at 17:23
  • Can't I use Spark directly? I am using Spark 1.6.2 – guilhermecgs Sep 20 '16 at 17:29
  • If you mean PySpark then like I said - RDDs should work just fine. – zero323 Sep 20 '16 at 17:31
  • I will further investigate exactly how can I use rdds before asking newbie questions... :-) – guilhermecgs Sep 20 '16 at 17:41
  • @guilhermecgs were you able to achieve this with PySpark? I am trying to do something similar and I am stuck. Would love to hear your thoughts on this. – Nischal Hp Mar 09 '17 at 07:11
  • 2
    @NischalHp : df.rdd.keyBy(lambda x: (x['key1'], x['key2'])) \ .groupByKey() \ .map(lambda groupped_data: my_map_fn(groupped_data)) – guilhermecgs Mar 09 '17 at 18:39
  • @guilhermecgs okay, this is awesome. I am going to give it a shot. thank you – Nischal Hp Mar 20 '17 at 13:43

2 Answers2

12

What you are looking for exists since Spark 2.3: Pandas vectorized UDFs. It allows to group a DataFrame and apply custom transformations with pandas, distributed on each group:

df.groupBy("groupColumn").apply(myCustomPandasTransformation)

It is very easy to use so I will just put a link to Databricks' presentation of pandas UDF.

However, I don't know such a practical way to make grouped transformations in Scala yet, so any additional advice is welcome.

EDIT: in Scala, you can achieve the same thing since earlier versions of Spark, using Dataset's groupByKey + mapGroups/flatMapGroups.

Florent F
  • 591
  • 6
  • 16
10
  • While Spark provides some ways to integrate with Pandas it doesn't make Pandas distributed. So whatever you do with Pandas in Spark is simply local (either to driver or executor when used inside transformations) operation.

    If you're looking for a distributed system with Pandas-like API you should take a look at dask.

  • You can define User Defined Aggregate functions or Aggregators to process grouped Datasets but this part of the API is directly accessible only in Scala. It is not that hard to write a Python wrapper when you create one.
  • RDD API provides a number of functions which can be used to perform operations in groups starting with low level repartition / repartitionAndSortWithinPartitions and ending with a number of *byKey methods (combineByKey, groupByKey, reduceByKey, etc.).

    Which one is applicable in your case depends on the properties of the function you want to apply (is it associative and commutative, can it work on streams, does it expect specific order).

    The most general but inefficient approach can be summarized as follows:

    h(rdd.keyBy(f).groupByKey().mapValues(g).collect())
    

    where f maps from value to key, g corresponds to per-group aggregation and h is a final merge. Most of the time you can do much better than that so it should be used only as the last resort.

  • Relatively complex logic can be expressed using DataFrames / Spark SQL and window functions.

  • See also Applying UDFs on GroupedData in PySpark (with functioning python example)

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Much of the pandas API can now run distributed on spark. See https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/supported_pandas_api.html for details. – Alexander Measure Sep 23 '22 at 12:44