Spark >= 3.0
Since 3.0 Spark provides PySpark-specific cogroup
using Pandas / Arrow. General syntax is as follows:
left.cogroup(right).apply(f)
where both
and right
are GroupedData
objects and f
is a COGROUPED_MAP
User Defined Function that takes two Pandas DataFrames
and returns Pandas DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame
@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...
Spark >= 1.6
JVM KeyValueGroupedDataset
provides both Java
def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R]
and Scala
def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R]
It is however intended for "strongly" typed variants, not Dataset[Row]
, and is highly unlikely to contribute to your declared goal (performance improvement).
Spark < 1.6 (this part stays valid onward, with exception to small API additions listed above).
DataFrame
doesn't provide any equivalent of cogroup
function and complex objects are not the first class citizens in the Spark SQL. A set of operations available on complex structures is rather limited so typically you have to either create custom expression what is not trivial or use UDFs and pay a performance penalty. Moreover Spark SQL doesn't use the same join
logic as plain RDDs
.
Regarding RDDs. While there exist border cases where cogroup
can be favorable over join
but typically it shouldn't be the case unless the results -> Cartesian product of complete dataset. After all joins on RDDs are expressed using cogroup
followed by flatMapValues
and since the latter operation is local the only real overhead is creation of the output tuples.
If your tables contain only primitive types you could mimic co-group like behavior by aggregating columns with collect_list
first but I wouldn't expect any performance gains here.