2

Spark version - 2.2.1.

I've created a bucketed table with 64 buckets, I'm executing an aggregation function select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa . I can see that 64 tasks in Spark UI, which utilize just 4 executors (each executor has 16 cores) out of 20. Is there a way I can scale out the number of tasks or that's how bucketed queries should run (number of running cores as the number of buckets)?

Here's the create table:

sql("""CREATE TABLE level_1 (
 bundle string,
  date_ date,
 hour SMALLINT)
 USING ORC
 PARTITIONED BY (date_ , hour )
 CLUSTERED BY (ifa)
 SORTED BY (ifa)
 INTO 64 BUCKETS
 LOCATION 'XXX'""")

Here's the query:

sql(s"select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa").show
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Modi
  • 2,200
  • 4
  • 23
  • 37

2 Answers2

2

With bucketing, the number of tasks == number of buckets, so you should be aware of the number of cores/tasks that you need/want to use and then set it as the buckets number.

Modi
  • 2,200
  • 4
  • 23
  • 37
  • Does that mean that bucketing is not appropriate for datasets which grow in time? Because the number of buckets (and files within eg. each date partition folder) stays the same when appending to such a dataset and therefore reading from this dataset (table) does not scale.. – Melkor.cz Jan 09 '20 at 11:48
1

num of task = num of buckets is probably the most important and under-discussed aspect of bucketing in Spark. Buckets (by default) are historically solely useful for creating "pre-shuffled" dataframes which can optimize large joins. When you read a bucketed table all of the file or files for each bucket are read by a single spark executor (30 buckets = 30 spark tasks when reading the data) which would allow the table to be joined to another table bucketed on the same # of columns. I find this behavior annoying and like the user above mentioned problematic for tables that may grow.

You might be asking yourself now, why and when in the would I ever want to bucket and when will my real-world data grow exactly in the same way over time? (you probably partitioned your big data by date, be honest) In my experience you probably don't have a great use case to bucket tables in the default spark way. BUT ALL IS NOT LOST FOR BUCKETING!

Enter "bucket-pruning". Bucket pruning only works when you bucket ONE column but is potentially your greatest friend in Spark since the advent of SparkSQL and Dataframes. It allows Spark to determine which files in your table contain specific values based on some filter in your query, which can MASSIVELY reduce the number of files spark physically reads, resulting in hugely efficient and fast queries. (I've taken 2+hr queries down to 2 minutes and 1/100th of the Spark workers). But you probably don't care because of the # of buckets to tasks issue means your table will never "scale-up" if you have too many files per bucket, per partition.

Enter Spark 3.2.0. There is a new feature coming that will allow bucket pruning to stay active when you disable bucket-based reading, allowing you to distribute the spark reads with bucket-pruning/scan. I also have a trick for doing this with spark < 3.2 as follows. (note the leaf-scan for files with vanilla spark.read on s3 is added overhead but if your table is big it doesn't matter, bc your bucket optimized table will be a distributed read across all your available spark workers and will now be scalable)

val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD,FilePartition}


val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
val scan = sparkplan.collectFirst { case exec: FileSourceScanExec => exec }.get
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }
yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("
.").last
val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))
Gabe Church
  • 331
  • 3
  • 7