0

I'm trying to improve a process in spark-sql. I have two processes in batch, the output of one is the input of the second, I need them splitted.

I have a table in my first process, partitioned using spark-sql by a key loaded, if i persist it in a datastore, spark is going to loose the tracking of the hash used for this table. Later i need to load this data in other process, and make some joins with other data, the key for this join in the data loaded from the other process, is going to be the same as the previous one. And in this case, spark load the data, but with the lack of knowledge of the hashing used to persist it, it's going to redo the shuffling to put the data in the expected spark-sql partitions. As the the number of sql partitions is the same in both processes, the key also is the same, i think that this last shuffle is avoidable, but i don't know how.

In resume, i want to know a way to persist in a hdfs datastore the data, in a way that i can preserve the HashPartition that spark-sql putted by a key, to improve following readings avoiding that first shuffle. Or in less words. I want to read a partitioned table, keaping the traking of the partition key in the table, to avoid shuffling.

A pseudocode of what i want to do:

val path = "/home/data"
ds.repartition(col("key")).write.parquet(path)
//in other spark-sql process
sparkSession.read.parquet(path).repartition(col("key")) 
//i know i need this last repartition
//but how could i make it as much efficient as i could
Alfilercio
  • 1,088
  • 6
  • 13

1 Answers1

0

Basically you want to persist a partitioned dataset to HDFS. Then if you read it, spark will take into account the way it is partitionned for further processing. For instance, let say you have the following dataframe:

val df = sc.parallelize(
      Seq(1 -> 3, 1 -> 4, 2 -> 5, 
      2->7, 1->10, 2->7, 3->67, 
      3->89, 3->7)).toDF("A", "B")

Then let's try a group by:

df.groupBy("A").agg(count(lit(1)) as "C").explain
== Physical Plan ==
*HashAggregate(keys=[A#41], functions=[count(1)])
+- Exchange hashpartitioning(A#41, 200)
   +- *HashAggregate(keys=[A#41], functions=[partial_count(1)])
      +- *Project [_1#38 AS A#41]
         +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#38, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#39]
            +- Scan ExternalRDDScan[obj#37]

The line Exchange hashpartitioning(A#41, 200) tells you there is going to be some shuffle.

Now, if you persist your data onto your disk :

df
    .write
    .partitionBy("key")
    .parquet("hdfs:///path/dataset")

Then if you read it and try to perform a group by:

spark.read.parquet("data.parquet").agg(count(lit(1)) as "C").explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
    +- Exchange SinglePartition
       +- *HashAggregate(keys=[], functions=[partial_count(1)])
          +- *Project
             +- *FileScan parquet [A#78] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/olivier/data.parquet], PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Exchange SinglePartition tells you that no shuffle will be performed.

You can also filter your according to the "A" column if you don't need all the data and Spark won't have to load the data it does not need. See Reading DataFrame from partitioned parquet file for more details.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • No, i want to read a partitioned table, keaping the traking of the partition key in the table, to avoid shuffling. – Alfilercio Jun 15 '18 at 07:27
  • I added a lot of details. If you have a partionned dataframe persisted and read it, SPark will take it into account and avoid unnecessary shuffle. – Oli Jun 16 '18 at 08:39