I am running a spark job to insert data into Clickhouse through Clickhouse JDBC driver. In a single node setup, everything works as expected.
But when I ran the same job on a multi-node cluster [ no replication, 2 shards ], when I try SELECT COUNT() from table
, I get 2X of the number of rows I had inserted.
Here's the exact setup
Machine 1 - Master node with only Distributed Table
Create Table Query - CREATE TABLE trial.illogs (userId String, country String, date String, domain String, pathname String, platform String, siteId Int64, uniqueId String, receivedTs Int64, ua String, clientIp String, receivedDate Date) ENGINE = Distributed('trial-cluster', '', 'illogs', rand())
Machine 2 & Machine 3 - Shard with MergeTree Table
Create Table Query - CREATE TABLE trial.illogs (userId String, country String, date String, domain String, pathname String, platform String, siteId Int64, uniqueId String, receivedTs Int64, ua String, clientIp String, receivedDate Date) ENGINE = MergeTree ORDER BY receivedTs TTL receivedDate + toIntervalMonth(1) SETTINGS index_granularity = 8192
Here's the config.xml for shard setup
<remote_servers>
<trial-cluster>
<shard>
<replica>
<default_database>trial</default_database>
<host>node1</host>
<port>9000</port>
<password>zxcv</password>
</replica>
</shard>
<shard>
<replica>
<default_database>trial</default_database>
<host>node2</host>
<port>9000</port>
<password>zxcv</password>
</replica>
</shard>
</trial-cluster>
</remote_servers>
Here's the spark job that inserts into Click house [ I've just added the write part to keep it short ] :
df
.write()
.format("jdbc")
.mode(SaveMode.Append)
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("url", jdbcUrl)
.option("user", "default")
.option("password", pass)
.option("ssh", "false")
.option("createTableOptions", createTableOptions)
.option("dbtable", tableName)
.option("truncate", "true")
.option("batchsize", batchSize)
.option("numPartitions", maxWritePartitions)
.option("isolationLevel", "NONE")
.save();
When I ran the same spark job in a single cluster setup, the count of the DataFrame matches what's present in Click house. But with 2 shards, the count is exactly 2x of what's present in the DataFrame.
An option is to split the data in spark and insert it in each node respectively, but I want to make it work where I don't handle the sharding logic and let click house handle it by inserting directly on the Distributed Table.