3

I have an Iceberg table created with

CREATE TABLE catalog.db.table (a int, b int) USING iceberg

Then I apply some sort order on it

ALTER TABLE catalog.db.table WRITE ORDERED BY (a, b)

After invoking the last command, SHOW TBLPROPERTIES catalog.db.table starts showing the write.distribution-mode: range property:

|sort-order             |a ASC NULLS FIRST, b ASC NULLS FIRST|
|write.distribution-mode|range                               |

Now I'm writing data into the table:

df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()

I thought this should have created a single task in spark, which would sort all data in dataframe (in fact it is sorted since creation) and then insert it into the table as a single file too.

Unfortunately, at the moment of writing, spark decides to repartition all data which causes shuffling. I believe this happens due to write.distribution-mode: range, which had been automatically set.

== Physical Plan ==
AppendData (6)
+- * Sort (5)
   +- Exchange (4)    # :(
      +- * Project (3)
         +- Coalesce (2)
            +- * Scan ExistingRDD (1)

Is there a way to insert new data but also avoid unwanted shuffling?

Ixanezis
  • 1,631
  • 13
  • 20
  • Is `coalesce(1)` just a hypothetical example? In practice you will probably have multiple (dataframe) partitions, hence shuffle cannot be avoided – shay__ Dec 31 '22 at 17:14
  • @shay__ yes, just a hypothetical example. Nevertheless, I don't see why multiple dataframe partitions lead to that shuffle cannot be avoided. – Ixanezis Jan 01 '23 at 11:11
  • Because the sorting is **global** – shay__ Jan 01 '23 at 11:38
  • Why would we want to have an entire dataframe sorted globally? Different partitions get written independently into one or more (usually parquet) files, ensuring each one is sorted. Different files within entire table are not guaranted to be sorted with respect to one another. – Ixanezis Jan 01 '23 at 14:24
  • Obviously, if the table is partitioned, then the sorting is global **per partition**. In both cases - partitioned or not - you have to shuffle the data. – shay__ Jan 02 '23 at 15:15
  • Sorry, I was distracted and thought you were talking about table partitions. Sorted table refers to global sorting, not locally per file. – shay__ Jan 02 '23 at 18:13

1 Answers1

0

According to the Apache Iceberg docs, WRITE ORDERED BY does the following:

Iceberg tables can be configured with a sort order that is used to automatically sort data that is written to the table in some engines. For example, MERGE INTO in Spark will use the table ordering.

Now, you create and write your table with the following:

df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()

Sorting a dataframe requires a shuffle operation. You've used sortWithinPartitions which does sort your data, but only within your partitions. So this does not do the full sort operation as your Iceberg table requires.

Hence, you need another full shuffle operation to do your complete sort.

Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • The docs also say `WRITE ORDERED BY sets a global ordering where rows are ordered across tasks`. I don't see any evidence that full sort is actually required before writing into the table. Rather, I believe we just need to have sorted dataframe partitions. – Ixanezis Jan 01 '23 at 11:14
  • 1
    Looks like we're interpreting the docs differently. To me, setting a *global ordering* where rows are ordered *across* tasks sounds exactly like needing a full sort. A little further, the docs talk about `LOCALLY ORDERED BY`, which only makes you sort *within* tasks, not *across*. Maybe you're looking for that? – Koedlt Jan 02 '23 at 07:11
  • Yes, maybe you are right. I was thinking `WRITE ORDERED` to behave as `WRITE LOCALLY ORDERED`, and I needed the latter I guess. Thank you. However, I'm not really sure about real use cases for the `WRITE ORDERED`. – Ixanezis Jan 05 '23 at 11:26
  • 1
    @Ixanezis sorted tables almost always refer to globally sorted, so be sure there are enough "real use cases" :) – shay__ Jan 07 '23 at 09:47
  • For example, if you filter the sorted column on some range of values, Iceberg knows exactly which files are relevant just by looking at the metadata, instead of having to scan all the files (in case of local sorting) – shay__ Jan 07 '23 at 09:51
  • @shay__ "sorted tables almost always refer to globally sorted" - yes, but it appears that even though you may call it so, iceberg does not maintain the real global sorting across files after multiple commits into the table. They can still overlap. – Ixanezis Jan 24 '23 at 21:52
  • @shay__ I am not sure what is the difference btw global and local sort. Local I understand is sorted within a partition but what does global mean here and how it will be different. Any example to explain global partition ? – Atif May 27 '23 at 16:22
  • @Atif do not confuse table partitions with Spark partitions (tasks). Locally sorted means that every file **on it's own** will be sorted internally, that is, every Spark task will make sure to sort it's data. Global sorting makes sure that the data is sorted **across tasks**, not just internally. For example, in local sort you can have one file with values (1,3,5) and another with (2,4,6). Global sort will produce (1,2,3) and (4,5,6). All the above is per **one Spark job** (or write action). – shay__ May 27 '23 at 16:56
  • Thanks @shay__ I am referring to Iceberg partitions and iceberg table sorting. How data will be stored within Iceberg in the case of Global and Local sort. What is the use case of global and local sort ? – Atif May 27 '23 at 18:09