2

I am developing a simple API in ClickHouse that continuously calculate the number of distinct users with a specific key.

This environment has 2 tables and 1 materialized view:

  • the first table, init_table, receives repetitively batches of data.
  • the second table, final_table, counts the number of distinct users, using the user_id and based on a key composed with 2 elements, hash_id and item1.
  • this calculation is triggered from the init_table to the final_table with a materialized view.

Here is the code for the creation of tables and materialized view:

-- Init table
-- Table where data is continuously inserted in batches
DROP TABLE IF EXISTS test_db.init_table;
CREATE TABLE test_db.init_table (
    `timestamp` DateTime DEFAULT now(),
    `hash_id` FixedString(32),
    `item1` UInt32,
    `user_id` UInt32,
    `data1` UInt32,
    `data2` String
)   ENGINE = MergeTree()
    PARTITION BY tuple()
    ORDER BY ( hash_id, item1 )
    SETTINGS index_granularity = 8192;

-- Final table
DROP TABLE IF EXISTS test_db.final_table;
CREATE TABLE test_db.final_table (
    `timestamp` DateTime,
    `hash_id` FixedString(32),
    `item1` UInt32,
    `nb_user` UInt32
)   ENGINE = ReplacingMergeTree( timestamp )
    PARTITION BY tuple()
    ORDER BY ( hash_id, item1 )
    SETTINGS index_granularity = 8192;

-- Automating calculation from init table to final table
DROP TABLE IF EXISTS test_db.final_table_mv;
CREATE MATERIALIZED VIEW test_db.final_table_mv TO test_db.final_table AS
    SELECT
        timestamp,
        hash_id,
        item1,
        uniqExact( hash_id ) as nb_user
    FROM test_db.init_table
    GROUP BY ( timestamp, hash_id, item1 );

In this case, the Engine used to aggregate data is ReplacingMergeTree with the timestamp of data insertion as a parameter.

Data insert queries:

-- Data insertion
INSERT INTO test_db.init_table (hash_id,item1,user_id,data1,data2) VALUES ('564D6CE91699BC0174BED61EBA966A55',1,4444,'gnr','fbj'), ('564D6CE91699BC0174BED61EBA966A55',1,1111,'fhi','jdi'), ('564D6CE91699BC0174BED61EBA966A55',1,3333,'hvn','fhi');
SELECT sleep(2);
INSERT INTO test_db.init_table (hash_id,item1,user_id,data1,data2) VALUES ('564D6CE91699BC0174BED61EBA966A55',1,4444,'gnr','fbj'), ('61215DE218CC92BD74D82D2511EAC4CC',1,4444,'jbj','dhi'), ('5CC905405307AA837D943C266C84ECE9',1,4444,'vhi','bjh');
SELECT sleep(2);
INSERT INTO test_db.init_table (hash_id,item1,user_id,data1,data2) VALUES ('5CC905405307AA837D943C266C84ECE9',1,1111,'bjd','dic'), ('564D6CE91699BC0174BED61EBA966A55',1,1111,'fhi','jdi'), ('19DC7D744DD74D4BD15C298C118E72B7',1,3333,'hfj','bjd'), ('564D6CE91699BC0174BED61EBA966A55',1,3333,'hvn','fhi'), ('BAB3B080B7DF54D0831DC077F203673A',1,3333,'jij','vbj'), ('DED51D04E97D621780FC54580A9DA77B',1,1111,'vbj','hcn');
SELECT sleep(2);
INSERT INTO test_db.init_table (hash_id,item1,user_id,data1,data2) VALUES ('564D6CE91699BC0174BED61EBA966A55',1,5555,'fbj','jdh'), ('8C48E3B8888EB3C37B269B2D6A2A5206',1,5555,'dhi','vjs'), ('DED51D04E97D621780FC54580A9DA77B',1,5555,'bjh','jks');
SELECT sleep(2);
INSERT INTO test_db.init_table (hash_id,item1,user_id,data1,data2) VALUES ('564D6CE91699BC0174BED61EBA966A55',1,6666,'dic','msk'), ('3E33205D3367E2B9A3DB2F73A8CEF077',1,6666,'jdi','xok'), ('702893A3E0A402776BFCC3E7A4BF5F77',1,6666,'hcn','lxs');

After inserting a few datasets in the init_table, the number of users shown in the final_table is an aggregation of user_id based on the datasets and not based on the content of init_table.

-- Testing data
-- Number of distinct user_id in the init_table
select count(distinct user_id) from test_db.init_table where hash_id = '564D6CE91699BC0174BED61EBA966A55'; 
-- n = 5 --> this should be the right answer

-- Content of the final_table filtering on hash_id 564D6CE91699BC0174BED61EBA966A55
select * from test_db.final_table where hash_id = '564D6CE91699BC0174BED61EBA966A55' order by timestamp;
-- timestamp                hash_id                                 item1   nb_user
-- 2020-07-24 07:19:26      '564D6CE91699BC0174BED61EBA966A55'      1       3
-- 2020-07-24 07:19:28      '564D6CE91699BC0174BED61EBA966A55'      1       1
-- 2020-07-24 07:19:31      '564D6CE91699BC0174BED61EBA966A55'      1       2
-- 2020-07-24 07:19:33      '564D6CE91699BC0174BED61EBA966A55'      1       1
-- 2020-07-24 07:19:36      '564D6CE91699BC0174BED61EBA966A55'      1       1

-- Result after merging the data
select * from test_db.final_table final where hash_id = '564D6CE91699BC0174BED61EBA966A55' order by timestamp;
-- timestamp                hash_id                                 item1   nb_user
-- 2020-07-24 07:19:36      '564D6CE91699BC0174BED61EBA966A55'      1       1

So, the final result I have here is not the number of distinct user_id present in the init_table, but the number of distinct user_id in the last dataset inserted into the init_table.

What I would like in the final_table is the total number of distinct user_id in the init_table group by hash_id and item1 (the key), like this:

hash_id                                 item1   nb_user
'564D6CE91699BC0174BED61EBA966A55'      1       5

With 5 being the total number of distinct user_id in the init_table regarding the datasets we have here.

I also tried to use some other Engines such as MergeTree and AggregatingMergeTree, without success. What am I doing wrong. Do you have any suggestion?

Ludo
  • 45
  • 5

1 Answers1

3

It looks like threre is the error in test_db.final_table_mv-table: instead of uniqExact( hash_id ) as nb_user need to use uniqExact( user_id ) as nb_user.

In general, your code is error-prone because can be lost some aggregated values after ReplacingMergeTree merges rows.


I would simplify your code by using one table of raw data and one Aggregating MV that calculates intermediate states. Intermediate state allows to calculate aggregations for any combination of dimensions (see Queries below).

CREATE TABLE init_table (
  /* borrow origin code */
) /* .. */;


CREATE MATERIALIZED VIEW aggregates_mv
ENGINE = AggregatingMergeTree()
PARTITION BY tuple()
ORDER BY (hash_id, item1) 
AS
SELECT  
  hash_id, 
  item1,
  uniqExactState(user_id) AS nb_user
FROM init_table
GROUP BY hash_id, item1;

Queries:

SELECT hash_id, item1, uniqExactMerge(nb_user) AS nb_user
FROM aggregates_mv
GROUP BY hash_id, item1;
/*
┌─hash_id──────────────────────────┬─item1─┬─nb_user─┐
   ...
│ 564D6CE91699BC0174BED61EBA966A55 │     1 │       5 │
   ...
└──────────────────────────────────┴───────┴─────────┘
*/

SELECT hash_id, uniqExactMerge(nb_user) AS nb_user
FROM aggregates_mv
GROUP BY hash_id;
/*
┌─hash_id──────────────────────────┬─nb_user─┐
   ...
│ 564D6CE91699BC0174BED61EBA966A55 │       5 │
   ... 
└──────────────────────────────────┴─────────┘
*/

SELECT item1, uniqExactMerge(nb_user) AS nb_user
FROM aggregates_mv
GROUP BY item1;
/*
┌─item1─┬─nb_user─┐
│     1 │       5 │
└───────┴─────────┘
*/
vladimir
  • 13,428
  • 2
  • 44
  • 70
  • Thank you @vladimir for your answer. Indeed, the intermediate state seems to be a good solution to this case. But what about having a few tables/MVs/Vs that are dependent from each other? For example, if I want to add a table after the MV you proposed that will continue calculating things. Is it possible to continue with this MV? – Ludo Jul 24 '20 at 11:35
  • 1
    @Ludo no problem I glad to help you. Yes MV can be used in any case. In MV can be used [POPULATE](https://clickhouse.tech/docs/en/sql-reference/statements/create/view/#materialized)-option that leads to processing all already stored data in origin table (take into account that during 'population' MV will ignore data inserted at the same time so need to pause the input data ingestion until 'populating' be completed). – vladimir Jul 24 '20 at 11:49
  • 1
    @Ludo As an alternate way, you can not use *POPULATE*-option and save the required data to MV manually using *INSERT*. – vladimir Jul 24 '20 at 11:51
  • Ok thank you for your answers. I'll run some tests with these recommendations. I have another question regarding the use of JOIN statement within MVs, regarding the fact that MV triggers only the left table of a JOIN statement. But I will create another question for this one I think. Have a good weekend @vladimir – Ludo Jul 24 '20 at 12:21
  • 1
    @Ludo thanks, look at this one - https://stackoverflow.com/questions/51233488/update-materialized-view-with-join-statement – vladimir Jul 24 '20 at 12:25