1

I'm trying to perform a isin filter as optimized as possible. Is there a way to broadcast collList using Scala API?

Edit: I'm not looking for an alternative, I know them, but I need isin so my RelationProviders will pushdown the values.

  val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
  //collList.size == 200.000
  val retTable = df.filter(col("col1").isin(collList: _*))

The list i'm passing to the "isin" method has upto ~200.000 unique elements.

I know this doesn't look like the best option and a join sounds better, but I need those elements pushed down into the filters, makes a huge difference when reading (my storage is Kudu, but it also applies to HDFS+Parquet, base data is too big and queries work on around 1% of that data), I already measured everything, and it saved me around 30minutes execution time :). Plus my method already takes care if the isin is larger than 200.000.

My problem is, I'm getting some Spark "task are too big" (~8mb per task) warnings, everything works fine so not a big deal, but I'm looking to remove them and also optimize.

I've tried with, which does nothing as I still get the warning (since the broadcasted var gets resolved in Scala and passed to vargargs I guess):

  val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
  val retTable = df.filter(col("col1").isin(sc.broadcast(collList).value: _*))

And this one which doesn't compile:

  val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
  val retTable = df.filter(col("col1").isin(sc.broadcast(collList: _*).value))

And this one which doesn't work (task too big still appears)

  val broadcastedList=df.sparkSession.sparkContext.broadcast(collList.map(lit(_).expr))
  val filterBroadcasted=In(col("col1").expr, collList.value)
  val retTable = df.filter(new Column(filterBroadcasted))

Any ideas on how to broadcast this variable? (hacks allowed). Any alternative to the isin which allows filter pushdown is also valid I've seen some people doing it on PySpark, but the API is not the same.

PS: Changes on the storage are not possible, I know partitioning (already partitioned, but not by that field) and such could help, but user inputs are totally random and the data is accessed and changed my many clients.

BiS
  • 501
  • 4
  • 17

2 Answers2

0

I'd opt for dataframe broad cast hash join in this case instead of broadcast variable.

Prepare a dataframe with your collectedDf("col1") collection list you want to filter with isin and then use join between 2 dataframes to filter the rows matching.

enter image description here

I think it would be more efficient than isin since you have 200k entries to be filtered. spark.sql.autobroadcastjointhreshhold is the property you need to set with appropriate size(by default 10mb). AFAIK you can use till 200mb or 3oomb based on your requirements.

see this BHJ Explanation of how it works

Further reading Spark efficiently filtering entries from big dataframe that exist in a small dataframe

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Hi Ram, as I said in the question, while this answer is valid for the common case, that's what I do when the input data is bigger than 200k elements (actually my input data is a dataframe), but for this concrete case, doing a left join (thus reading the whole table) takes 30minutes, 200k elements (usually less, around 50k) isin takes ~2minutes to "broadcast" the isin and ~3-5minutes to read. I can't read less columns as I need all, and I can't reduce the data enough using another pushdown filters – BiS Apr 09 '20 at 10:47
  • Aka the main point here is predicate pushdown, I need the predicates to be pushed to the server (Kudu) so I don't need to retrieve 10-20x more data than the data I'm actually looking for. If there's a way to force leftsemi joins (which is what I should use here) to pushdown it would work too. – BiS Apr 09 '20 at 11:01
  • I dont know much about kudu and how it works. So I could not able to understand your PP part, but in general if you use parquet and if you use where condition it will use predicate pushdown. may be you want to like that I am not sure. – Ram Ghadiyaram Apr 09 '20 at 15:50
  • Yeah, the thing is that in order to use the "where condition", Spark has to send big tasks (which contain a copy of the big where list, upto 200.000 elements) and it seems Spark was not designed to allow broadcasting on those lists (I was surprised too, I thought the limit would be ~1000 elements, but it works fine upto 400k [didn't test more because around 400.000 in there isin elements I don't get performance improvements for my usecase] – BiS Apr 09 '20 at 16:26
0

I'll just leave with big tasks since I only use it twice (but saves a lot of time) in my program and I can afford it, but if someone else needs it badly... well this seems to be the path.

Best alternatives I found to have big-arrays pushdown:

  1. Change your relation provider so it broadcasts big-lists when pushing down In filters, this will probably leave some broadcasted trash, but well..., as long as your app is not streaming, it shouldn't be a problem, or you can save in a global list and clean those after a while
  2. Add a filter in Spark (I wrote something at https://issues.apache.org/jira/browse/SPARK-31417 ) which allows broadcasted pushdown all the way to your relation provider. You would have to add your custom predicate, then implement your custom "Pushdown" (you can do this by adding a new rule) and then rewrite your RDD/Relation provider so it can exploit the fact the variable is broadcasted.
  3. Use coalesce(X) after reading to decrease number of tasks, can work sometimes, depends on how the RelationProvider/RDD is implemented.
BiS
  • 501
  • 4
  • 17