1

I have this Hive MERGE statement:

MERGE INTO destination dst
USING (
  SELECT

   -- DISTINCT fields
      company
    , contact_id as id
    , ct.cid as cid

     -- other fields
    , email
    , timestamp_utc
    -- there are actually about 6 more 

    -- deduplication
    , ROW_NUMBER() OVER (
         PARTITION BY company
       , ct.id
       , contact_id
         ORDER BY timestamp_utc DESC
    ) as r

  FROM
    source
  LATERAL VIEW explode(campaign_id) ct AS cid
) src
ON
        dst.company = src.company
    AND dst.campaign_id = src.cid
    AND dst.id = src.id

-- On match: keep latest loaded
WHEN MATCHED
    AND dst.updated_on_utc < src.timestamp_utc
    AND src.r = 1
THEN UPDATE SET
    email =  src.email
  , updated_on_utc = src.timestamp_utc

WHEN NOT MATCHED AND src.r = 1 THEN INSERT VALUES (
    src.id

  , src.email

  , src.timestamp_utc

  , src.license_name
  , src.cid
)
;

Which runs for a very long time (30 minutes for 7GB of avro compressed data on disk). I wonder if there are any SQL ways to improve it.

ROW_NUMBER() is here to deduplicate the source table, so that in the MATCH clause we only select the earliest row.

One thing I am not sure of, is that hive says:

SQL Standard requires that an error is raised if the ON clause is such that more than 1 row in source matches a row in target. This check is computationally expensive and may affect the overall runtime of a MERGE statement significantly. hive.merge.cardinality.check=false may be used to disable the check at your own risk. If the check is disabled, but the statement has such a cross join effect, it may lead to data corruption.

I do indeed disable the cardinality check, as although the ON statement might give 2 rows in source, those rows are limited to 1 only thanks to the r=1 later in the MATCH clause.

Overall I like this MERGE statement but it is just too slow and any help would be appreciated.

Note that the destination table is partitioned. The source table is not as it is an external table which for every run must be fully merged, so fully scanned (in the background already merged data files are removed and new files are added before next run). Not sure that partitioning would help in that case

What I have done:

  • play with hdfs/hive/yarn configuration
  • try with a temporary table (2 steps) instead of a single MERGE, the run time jumped to more than 2 hours.
Guillaume
  • 2,325
  • 2
  • 22
  • 40

1 Answers1

1

Option 1: Move where filter where src.r = 1 inside the src subquery and check the merge performance. This will reduce the number of source rows before merge.

Other two options do not require ACID mode. Do full target rewrite.

Option 2: Rewrite using UNION ALL + row_number (this should be the fastest one):

insert overwrite table destination 
select 
company
, contact_id as id
, ct.cid as cid
, email
, timestamp_utc
, -- add more fields 
from
(
select --dedupe, select last updated rows using row_number
s.*
, ROW_NUMBER() OVER (PARTITION BY company, ct.id , contact_id ORDER BY timestamp_utc DESC) as rn
from
(
select --union all source and target
company
, contact_id as id
, ct.cid as cid
, email
, timestamp_utc
, -- add more fields 
from source LATERAL VIEW explode(campaign_id) ct AS cid
UNION ALL
select 
company
, contact_id as id
, ct.cid as cid
, email
, timestamp_utc
,-- add more fields 
from destination
)s --union all
where rn=1 --filter duplicates
)s-- filtered dups

If source contains a lot of duplicates, you can apply additional row_number filtering to the src subquery as well before union.

One more approach using full join: https://stackoverflow.com/a/37744071/2700344

leftjoin
  • 36,950
  • 8
  • 57
  • 116
  • Thanks, I am running performance tests with your different options. I wonder about the insert overwrite - what would happen if the destination table (partition) has millions of rows? I would have the feeling that merging (eg.) 100k rows into 100M rows be faster than insert overwrite 100M rows? – Guillaume Oct 09 '17 at 06:11
  • @Guillaume Please share your results. I'm using second option on Hive 1.2. – leftjoin Oct 09 '17 at 09:04
  • option in the question: ~25-30 minutes, your option1 (src.r=1 in subquery): 30-35minutes, your option 2 (no acid, insert overwrite):~32 minutes, temporary table: 2h+. How big are your source/destination tables? – Guillaume Oct 09 '17 at 09:51
  • ~50M runs about 10 min. – leftjoin Oct 09 '17 at 10:36
  • Are you running Tez or MR? I'm running on Tez – leftjoin Oct 09 '17 at 10:37
  • @Guillaume Try to tune this parameter: hive.exec.reducers.bytes.per.reducer=67108864; -decreasing this value will increase the number of reducers(more parallelizm). Need full log to understand what exactly is running slow. – leftjoin Oct 09 '17 at 10:42
  • running Tez, source is 7GB over 23M rows. I already have about 450 reducers for 10 tez containers which can run in parallel, so increasing parallelism here will not help I think. Something that does not help is the explode and a big field with a complex structure. – Guillaume Oct 09 '17 at 11:19
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/156280/discussion-between-leftjoin-and-guillaume). – leftjoin Oct 09 '17 at 16:05