1

I have two dataframes that I need to join together using a non-equi-join (i.e. an inequality join) that has two join predicates.

One dataframe is a histogram DataFrame[bin: bigint, lower_bound: double, upper_bound: double]
The other dataframe is a collection of observations DataFrame[id: bigint, observation: double]

I need to determine which bin of my histogram each observation falls into, like so:

observations_df.join(histogram_df, 
    (
        (observations_df.observation >= histogram_df.lower_bound) &
        (observations_df.observation < histogram_df.upper_bound)
    )
   )

Basically it is very slow and I'm looking for some suggestions as to how I can make it go quicker.

Below is some sample code the demonstrates the problem. observations_df contains 100000 rows, when the number of rows in histogram_df becomes suitably large (let's say number_of_bins = 500000) then it becomes very very slow and I'm certain its because I'm doing a non-equi-join. If you run this code then play around with the value of number_of_rows, start with something low and then increase until the slow performance is noticeable

from pyspark.sql.functions import lit, col, lead
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import rand
from pyspark.sql import Window
spark = SparkSession \
    .builder \
    .getOrCreate()

number_of_bins = 500000

bin_width = 1.0 / number_of_bins
window = Window.orderBy('bin')
histogram_df = spark.range(0, number_of_bins)\
    .withColumnRenamed('id', 'bin')\
    .withColumn('lower_bound', 0 + lit(bin_width) * col('bin'))\
    .select('bin', 'lower_bound', lead('lower_bound', 1, 1.0).over(window).alias('upper_bound'))
observations_df = spark.range(0, 100000).withColumn('observation', rand())
observations_df.join(histogram_df, 
        (
            (observations_df.observation >= histogram_df.lower_bound) &
            (observations_df.observation < histogram_df.upper_bound)
        )
       ).groupBy('bin').count().head(15)
zero323
  • 322,348
  • 103
  • 959
  • 935
jamiet
  • 10,501
  • 14
  • 80
  • 159
  • Also [Optimize Spark job that has to calculate each to each entry similarity and output top N similar items for each](https://stackoverflow.com/q/50088548) can offer some hints. – zero323 Oct 01 '18 at 22:11
  • Thanks, yes, your first link does look useful. I shall give that a try and report back. – jamiet Oct 02 '18 at 05:18
  • I haven't got my scenario working yet but https://stackoverflow.com/questions/43483576/how-to-improve-broadcast-join-speed-in-spark is definitely a similar problem hence I'll accept that as the answer. Thank you @user6910411 . – jamiet Oct 02 '18 at 08:14

1 Answers1

1

Unequal join is not recommended for spark join. Usually, I generate a new column as the join key for this kind of operation. However, for your case, you do not need joining to determine which bin of the histogram each observation falls into because each bin's upper and lower bounds can be precalculated and you can calculate the bin with the observation.

What you can do is to write a UDF which finds the bin for you and returns the bin as a new column. You may refer to pyspark: passing multiple dataframe fields to udf

randomBet
  • 11
  • 2
  • Thanks for the reply. In this contrived demo, yes, the upper and lower bound of each bin can be derived however it IS just a demo. In my real world scenario that is not the case. – jamiet Oct 02 '18 at 06:57
  • If you insist you need an unequal join, then you need to provide the size of two tables you would like to join. And the tuning would depend on the sizes. If the histogram_df is small (I assume), you can cache it and do a broadcast join. If the histogram_df is even smaller that you can put it into a dictionary (or other data structures), you can broadcast the dictionary and write a UDF using binary search to find the bin. – randomBet Oct 02 '18 at 07:10
  • The demo code above is deliberately representative of my real-world scenario, indeed the real-world scenario is actually worse because `number_of_bins` in `histogram_df` is actually >1000000. Hence a Broadcast join is unfortunately not suitable (I have tried :) ). `observations_df` has ~100000 rows in my real-world scenario hence that's how many rows I populated in the demo above. – jamiet Oct 02 '18 at 08:07
  • I was wondering about this myself if you do non-equijoin how Catalyst works with this. – thebluephantom Oct 02 '18 at 10:03
  • But what if you do want an unequal join? – thebluephantom Oct 02 '18 at 12:02