4

I have 2 large DataFrames to be merged based on a association key. Using join takes a longer time to complete the task.

I see that using cogroup is prefered over Joins in Apache Spark. Can anyone point on how to use cogroup on DataFrames or suggest a better approach for merging 2 large DataFrames.

Thank you

Oliver W.
  • 13,169
  • 3
  • 37
  • 50
Kazhiyur
  • 865
  • 2
  • 10
  • 14

1 Answers1

6

Spark >= 3.0

Since 3.0 Spark provides PySpark-specific cogroup using Pandas / Arrow. General syntax is as follows:

left.cogroup(right).apply(f)

where both and right are GroupedData objects and f is a COGROUPED_MAP User Defined Function that takes two Pandas DataFrames and returns Pandas DataFrame

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame

@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...

Spark >= 1.6

JVM KeyValueGroupedDataset provides both Java

def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R] 

and Scala

def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R] 

It is however intended for "strongly" typed variants, not Dataset[Row], and is highly unlikely to contribute to your declared goal (performance improvement).

Spark < 1.6 (this part stays valid onward, with exception to small API additions listed above).

DataFrame doesn't provide any equivalent of cogroup function and complex objects are not the first class citizens in the Spark SQL. A set of operations available on complex structures is rather limited so typically you have to either create custom expression what is not trivial or use UDFs and pay a performance penalty. Moreover Spark SQL doesn't use the same join logic as plain RDDs.

Regarding RDDs. While there exist border cases where cogroup can be favorable over join but typically it shouldn't be the case unless the results -> Cartesian product of complete dataset. After all joins on RDDs are expressed using cogroup followed by flatMapValues and since the latter operation is local the only real overhead is creation of the output tuples.

If your tables contain only primitive types you could mimic co-group like behavior by aggregating columns with collect_list first but I wouldn't expect any performance gains here.

10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
  • There are a lot of cases where ```cogroup``` is favorable over ```join```. ```Cogroup``` is a join operator with ```groupBy``` semantics, it's very useful to put data in context. Complex objets are not yet first class citizens in Spark, but they yield tremendous performance benefits in impala by removing need for joins at read time. – jwinandy Dec 21 '16 at 08:07
  • @jwinandy Can you elaborate as I have trust in zero323 comments. – thebluephantom Sep 08 '18 at 13:33