4

I have two hive clustered tables t1 and t2

CREATE EXTERNAL TABLE `t1`(
   `t1_req_id` string,
    ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS

// t2 looks similar with same amount of buckets

the code looks like as following:

 val t1 = spark.table("t1").as[T1].rdd.map(v => (v.t1_req_id, v))
 val t2=  spark.table("t2").as[T2].rdd.map(v => (v.t2_req_id, v))

 val outRdd = t1.cogroup(t2)
      .flatMap { coGroupRes =>
        val key = coGroupRes._1
        val value: (Iterable[T1], Iterable[T2])= coGroupRes._2
        val t3List = // create a list with some logic on Iterable[T1] and Iterable[T2]
        t3List
 }
 outRdd.write....

I make sure that the both t1 and t2 table has same amount of partitions, and on spark-submit there are
spark.sql.sources.bucketing.enabled=true and spark.sessionState.conf.bucketingEnabled=true flags

But Spark DAG doesn't show any impact of clustering. It seems there is still data full shuffle What am I missing, any other configurations, tunings? How can it be assured that there is no full data shuffle? My spark version is 2.3.1

enter image description here

Julias
  • 5,752
  • 17
  • 59
  • 84

1 Answers1

2

And it shouldn't show.

Any logical optimizations are limited to DataFrame API. Once you push data to black-box functional dataset API (see Spark 2.0 Dataset vs DataFrame) and later to RDD API, no more information is pushed back to the optimizer.

You could partially utilize bucketing by performing join first, getting something around these lines

spark.table("t1")
   .join(spark.table("t2"), $"t1.t1_req_id" === $"t2.t2_req_id", "outer")
   .groupBy($"t1.v.t1_req_id", $"t2.t2_req_id")
   .agg(...) // For example collect_set($"t1.v"), collect_set($"t2.v")

However, unlike cogroup, this will generate full Cartesian Products within groups, and might be not applicable in your case

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • Thank you very much for the explanation. The reason I moved to rdd is that I need to join 5 tables on the same key - different amounts of events in each table, and based on the events data create also some meta counters. It is a kind of conceptual map-reduce but i wanted to avoid the full shuffle by preparing clustered inputs. – Julias May 11 '20 at 19:29