1

I have an iceberg table like below and trying to run a query using trino to provide the expected output.

Sample Data

trino:datalakepartncr_trino> select src.ID, src.OUTSTANDING1, src.OUTSTANDING2, src.OUTSTANDINGP, src.LASTACTION, src.LASTACTIONDATE, pos,changed_cols from table_event where src.ID in (4998,4952,4959) order by pos;
  ID  | OUTSTANDING1 | OUTSTANDING2 | OUTSTANDINGP | LASTACTION |   LASTACTIONDATE    |       pos       |                    changed_cols
------+--------------+--------------+--------------+------------+---------------------+-----------------+---------------------------------------------------------------
 4952 |          0.0 |            0 |            0 | NULL       | NULL                | 177240302252540 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4952 |         NULL |         NULL |         NULL | NULL       | 2022-12-15 13:30:02 | 177240302252701 | [ID, LASTACTION, LASTACTIONDATE]
 4952 |         NULL |         NULL |         NULL | ERA        | 2022-12-15 13:30:05 | 177240302255887 | [ID, LASTACTION, LASTACTIONDATE]
 4959 |       162.94 |            0 |            0 | NULL       | NULL                | 177240304204719 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4959 |        160.0 |            0 |            0 | NULL       | NULL                | 177240304207643 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4959 |       141.74 |            0 |            0 | NULL       | NULL                | 177240304209836 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4959 |         NULL |            0 |            0 | NULL       | NULL                | 177240304212737 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4959 |         NULL |         NULL |         NULL | CLOSE      | 2022-12-15 13:30:07 | 177240304212873 | [ID, LASTACTION, LASTACTIONDATE]
 4959 |         NULL |         NULL |         NULL | ERA        | 2022-12-15 13:30:07 | 177240304218096 | [ID, LASTACTION, LASTACTIONDATE]
 4959 |         NULL |         NULL |         NULL | NULL       | NULL                | 177240304219369 | [ID, LASTACTION, LASTACTIONDATE]
 4998 |         25.0 |            0 |            0 | NULL       | NULL                | 177330411767496 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4998 |         NULL |         NULL |         NULL | CREATE     | 2022-12-15 16:41:06 | 177330411767632 | [ID, LASTACTION, LASTACTIONDATE]
 4998 |         55.0 |            0 |            0 | NULL       | NULL                | 177330411775790 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
 4998 |         55.0 |            0 |            0 | NULL       | NULL                | 177330411781226 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]

The expected result should be the latest value of each column based on POS ordering and also the changed_cols to differentiate data NULLs vs spark NULLs.

The table is created via spark by reading from a JSON file. We are using the changed_cols field to differentiate NULLs that got generated by spark due to the field not being not present vs actual NULLs in the data itself.

If you look at the 4th record of ID:4959 we could see that the NULL in column OUTSTANDING1 is not because of field not being present but due to the data itself and that can be distinguished from changed_cols values.

Expected output:


  ID  | OUTSTANDING1 | OUTSTANDING2 | OUTSTANDINGP | LASTACTION |   LASTACTIONDATE    |                    changed_cols
------+--------------+--------------+--------------+------------+---------------------+----------------------------------------------------------------------------
 4952 |          0.0 |            0 |            0 | ERA        | 2022-12-15 13:30:05 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP, LASTACTION, LASTACTIONDATE]
 4959 |         NULL |            0 |            0 | NULL       | NULL                | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP, LASTACTION, LASTACTIONDATE]
 4998 |         55.0 |            0 |            0 | NULL       | NULL                | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]

I am able to rollup the changed_cols and get the distinct values but getting the latest value is not working.

trino:datalakepartncr_trino> select op_type,src.ID, lag(src.OUTSTANDING1) over (partition by src.ID order by pos),src.outstanding1, pos,array_distinct(flatten(array_agg(changed_cols) over (partition by src.ID order by pos))) from claim_event where src.ID in (4998,4952,4959) and op_type = 'U' order by op_ts, pos

Please provide some ideas on how to achieve this.

Additional sample JSON data to easily load and test

{"table":"TABLEA", "POS": 1, "ID": 244, "COLUMNA": 283.7, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "POS": 2, "ID": 244, "COLUMNA": null, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "POS": 3, "ID": 244, "COLUMNA": 200, "COLUMNC": "CLOSE", "changed_cols": ["ID", "COLUMNA", "COLUMNC"]}
{"table":"TABLEA", "POS": 4, "ID": 244, "COLUMNA": null, "COLUMNB": "user", "COLUMNC": "INTERIM", "COLUMND": 35000, "changed_cols": ["ID", "COLUMNA", "COLUMNB", "COLUMNC", "COLUMND"]}
{"table":"TABLEA", "POS": 5, "ID": 244, "COLUMNB": "user", "COLUMNC": "OPEN", "changed_cols": ["ID", "COLUMNB", "COLUMNC"]}
{"table":"TABLEA", "POS": 6, "ID": 244, "COLUMNB": "user", "COLUMNC": null, "changed_cols": ["ID", "COLUMNB", "COLUMNC"]}
{"table":"TABLEA", "POS": 1, "ID": 245, "COLUMNA": 283.7, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "POS": 2, "ID": 245, "COLUMNB": null, "changed_cols": ["ID", "COLUMNB"]}
{"table":"TABLEA", "POS": 3, "ID": 245, "COLUMNA": 200, "COLUMNC": "CLOSE", "changed_cols": ["ID", "COLUMNA", "COLUMNC"]}

Expected Output of sample JSON:

If we take the latest changed value from the above events for the primary key(ID) it will look like

+---+-------+-------+-------+-------+----------------------------------------+
|ID |COLUMNA|COLUMNB|COLUMNC|COLUMND|COL_LIST                                |
+---+-------+-------+-------+-------+----------------------------------------+
|244|null   |user   |null   |35000  |[ID, COLUMNA, COLUMNC, COLUMNB, COLUMND]|
|245|200.0  |null   |CLOSE  |null   |[ID, COLUMNA, COLUMNB, COLUMNC]         |
+---+-------+-------+-------+-------+----------------------------------------+```
arunb2w
  • 1,196
  • 9
  • 28

1 Answers1

1

If I understand correctly - you have "correct" data only if column name is present in changed_cols. One approach is to use a bit monstrous approach via max_by which will take in account only data which present in changed_cols array via if function which will return pos if analyzed column is present in the array:

-- sample data
WITH dataset(ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP, LASTACTION, LASTACTIONDATE, pos, changed_cols) AS (
    values (4952,          0.0,            0, 0, NULL       ,NULL               , 177240302252540, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4952,         NULL,         NULL, NULL, NULL       ,'2022-12-15 13:30:02', 177240302252701, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
         (4952,         NULL,         NULL, NULL, 'ERA'      ,'2022-12-15 13:30:05', 177240302255887, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
         (4959,       162.94,            0, 0, NULL       ,NULL               , 177240304204719, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4959,        160.0,            0, 0, NULL       ,NULL               , 177240304207643, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4959,       141.74,            0, 0, NULL       ,NULL               , 177240304209836, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4959,         NULL,            0, 0, NULL       ,NULL               , 177240304212737, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4959,         NULL,         NULL, NULL, 'CLOSE'    , '2022-12-15 13:30:07', 177240304212873, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
         (4959,         NULL,         NULL, NULL, 'ERA'      , '2022-12-15 13:30:07', 177240304218096, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
         (4959,         NULL,         NULL, NULL, NULL       ,NULL               , 177240304219369, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
         (4998,         25.0,            0, 0, NULL       ,NULL               , 177330411767496, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4998,         NULL,         NULL, NULL, 'CREATE'   , '2022-12-15 16:41:06', 177330411767632, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
         (4998,         55.0,            0, 0, NULL       ,NULL               , 177330411775790, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
         (4998,         55.0,            0, 0, NULL       ,NULL               , 177330411781226, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP'])
)

-- query
select ID,
       max_by(OUTSTANDING1, if(contains(changed_cols, 'OUTSTANDING1'), pos)) OUTSTANDING1,
       max_by(OUTSTANDING2, if(contains(changed_cols, 'OUTSTANDING2'), pos)) OUTSTANDING2,
       max_by(LASTACTION, if(contains(changed_cols, 'LASTACTION'), pos)) LASTACTION,
       -- handle arrays:
       array_distinct(flatten(array_agg(changed_cols))) changed_cols
    -- ... the rest of needed columns
from dataset
group by ID;

Output:

ID OUTSTANDING1 OUTSTANDING2 LASTACTION
4952 0.00 0 ERA
4998 55.00 0 CREATE
4959 NULL 0 NULL
Guru Stron
  • 102,774
  • 10
  • 95
  • 132
  • If i just use row_number based ranking approch and filter the latest it wont work as the output will be ```4952, NULL, NULL, NULL, ERA, 2022-12-15 13:30:05 4959, NULL, NULL, NULL, NULL, NULL 4998, 55.0, 0 , 0, NULL, NULL``` The main complexity here is to differentiate NULLs that originated due to missing fields vs actual NULLs that is part of data and giving precedence to it by using the changed_cols field values – arunb2w Mar 22 '23 at 08:14
  • 1
    @arunb2w ok. I will try to look into it later. – Guru Stron Mar 22 '23 at 08:16
  • I have updated the title to add more clarity because thats what am expecting. To differentiate actual nulls that came in data vs JSON nulls – arunb2w Mar 22 '23 at 10:12
  • @arunb2w see the changed answer. I've used the original data cause it was easier to translate for me. – Guru Stron Mar 22 '23 at 18:41
  • Thanks for the answer. How to rollup the distinct changed cols of all the rows in the output? I tried something like this but it is not working ```select ID, max_by(OUTSTANDING1, if(contains(changed_cols, 'OUTSTANDING1'), pos)) OUTSTANDING1, max(array_distinct(flatten(array_agg(changed_cols) over (partition by ID order by pos)))) -- ... the rest of needed columns from dataset group by ID;``` – arunb2w Mar 23 '23 at 07:08
  • I wa able to do something lie this and it worked! select ID, OUTSTANDING1, OUTSTANDING2, array_distinct(flatten(changed_cols_agg)) from ( select ID, max_by(OUTSTANDING1, if(contains(changed_cols, 'OUTSTANDING1'), pos)) OUTSTANDING1, max_by(OUTSTANDING2, if(contains(changed_cols, 'OUTSTANDING2'), pos)) OUTSTANDING2, array_agg(changed_cols) changed_cols_agg -- ... the rest of needed columns from dataset group by ID ); – arunb2w Mar 23 '23 at 07:15
  • @arunb2w yes, also note that you don't need extra select you can do it "inline", i.e. `array_distinct(flatten(array_agg(changed_cols))) changed_cols_agg` . Also you can try just using `set_union` if your version supports it. – Guru Stron Mar 23 '23 at 07:24
  • @arunb2w and was glad to help! Updated answer to support arrays. P.S. if answer works for you - feel free to upvote and mark it as accepted one ;) – Guru Stron Mar 23 '23 at 07:27
  • Sure. What version set_union supports? and which is more performant like array_agg and flatten or set_union? – arunb2w Mar 23 '23 at 07:37
  • @arunb2w I would guess that `set_union` should be at least as performant as composite approach, as for versions - currently it is supported only by Presto 0.239+ (not by Trino, created issue there) but the easiest way to check is just try substituting current approach with `set_union`. – Guru Stron Mar 23 '23 at 07:40