I'm trying to read data from GCS, aggregate them, and insert them into ClickHouse Cloud's SharingMergeTree tables.
I've come across this inconsistent behaviour when trying to insert data from GCS (read using the s3cluster
function), into the target MergeTree table. Sometimes the results don't show up even though the INSERT command shows success. However, when debugging with the same command/data, sometimes they did show up properly. (Most of the time they don't)
I was thinking about the possibility that it might be caused by unmerged parts/ out-of-sync replicas, but after trying the commands below, the inconsistency in the result still persists.
The commands I tried:
-- Merge
OPTIMIZE TABLE test_db.test_inserts FINAL;
-- Also, checked that there was only one part after OPTIMIZE
select count() parts_count, database, table, partition from system.parts
where table like '%test_inserts%' and active group by database, table, partition
order by parts_count desc;
select * from system.virtual_parts where table = 'test_inserts';
-- Sync replicas
SYSTEM SYNC REPLICA test_db.test_inserts LIGHTWEIGHT;
SELECT * FROM test_db.test_inserts SETTINGS select_sequential_consistency = 1;
Unfortunately, they all didn't resolve the inconsistent issue I'm facing.
Below are the steps and queries I used. I'm not able to make it for everyone to run and reproduce the issue since I'm getting data from a private GCS bucket.
s3cluster
(note that I also tried switching to use s3
and gcs
functions, but they also had the same problem).
Executing this would result in some aggregated rows (e.g, 5 rows)SELECT
column_a
countStateIf(column_b = 'abc') AS counts
FROM
(
SELECT
column_a,
column_b
FROM s3Cluster(
'default',
'https://storage.googleapis.com/BUCKET_NAME/PATH/TO/PARQUET/FILES/*.parquet',
'<Redacted HMAC Key>',
'<Redacted HMAC Secret>',
'Parquet'
)
) gcs
GROUP BY column_a
INSERT INTO test_db.test_inserts
Below is the CREATE TABLE command for this table:
CREATE TABLE test_db.test_inserts
(
`column_a` UInt16,
`counts` AggregateFunction(count)
)
ENGINE = SharedAggregatingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
ORDER BY (column_a)
SETTINGS index_granularity = 8192
SELECT
column_a,
countMerge(counts)
FROM test_db.test_inserts
GROUP BY
column_a
ORDER BY
column_a
;
This also happens when running through Python clickhouse_connect
(sending the query through client.command). The log shows 5 rows inserted successfully but nothing shows up when SELECTing.
{'read_rows': '46324168', 'read_bytes': '480616778', 'written_rows': '5', 'written_bytes': '1960', 'total_rows_to_read': '0', 'result_rows': '5', 'result_bytes': '1960', 'elapsed_ns': '2157374876', 'query_id': '<QUERY_ID>'}
My very own guess and observation is that it usually happens with queries that insert a small amount of rows. It's only my pure guess with no proof, and I'm not using Async Inserts, so I'm not sure if that relates.
Could someone please give me some advice on this? or how should I investigate this further?
Regarding s3Cluster
table function: Yes, deduplication of to-be-inserted blocks can happen when data is ingested in parallel by multiple nodes with s3Cluster
, as it can happen when data is ingested by a single node only. Each insert thread on each node is collecting up to min_insert_block_size_rows
or min_insert_block_size_bytes
in memory. Then this data is sorted based on the table's sorting key, then a hash is computed from that block of sorted data and compared with the hashes already existing in Keeper. If the same hash already exists, the block is discarded, otherwise the block is written to a part on disk, and the hash added to Keeper.