clickhouse

Inconsistent results when inserting data from `s3cluster` to ClickHouse Cloud's SharingMergeTree tables


I'm trying to read data from GCS, aggregate them, and insert them into ClickHouse Cloud's SharingMergeTree tables.

I've come across this inconsistent behaviour when trying to insert data from GCS (read using the s3cluster function), into the target MergeTree table. Sometimes the results don't show up even though the INSERT command shows success. However, when debugging with the same command/data, sometimes they did show up properly. (Most of the time they don't)

I was thinking about the possibility that it might be caused by unmerged parts/ out-of-sync replicas, but after trying the commands below, the inconsistency in the result still persists.

The commands I tried:

-- Merge
OPTIMIZE TABLE test_db.test_inserts FINAL;

-- Also, checked that there was only one part after OPTIMIZE
select count() parts_count, database, table, partition from system.parts
where table like '%test_inserts%' and active group by database, table, partition
order by parts_count desc;

select * from system.virtual_parts where table = 'test_inserts';


-- Sync replicas
SYSTEM SYNC REPLICA test_db.test_inserts LIGHTWEIGHT;
SELECT * FROM test_db.test_inserts SETTINGS select_sequential_consistency = 1;

Unfortunately, they all didn't resolve the inconsistent issue I'm facing.

Below are the steps and queries I used. I'm not able to make it for everyone to run and reproduce the issue since I'm getting data from a private GCS bucket.

  1. Read and aggregate data from GCS using s3cluster (note that I also tried switching to use s3 and gcs functions, but they also had the same problem). Executing this would result in some aggregated rows (e.g, 5 rows)
SELECT 
    column_a
    countStateIf(column_b = 'abc') AS counts
FROM 
(
    SELECT
        column_a,
        column_b
    FROM s3Cluster(
        'default',
        'https://storage.googleapis.com/BUCKET_NAME/PATH/TO/PARQUET/FILES/*.parquet', 
        '<Redacted HMAC Key>',
        '<Redacted HMAC Secret>', 
        'Parquet'
    )
) gcs
GROUP BY column_a
  1. Add INSERT INTO at the top of the above command, and execute INSERT successfully.
INSERT INTO test_db.test_inserts

Below is the CREATE TABLE command for this table:

CREATE TABLE test_db.test_inserts
(
    `column_a` UInt16,
    `counts` AggregateFunction(count)
)
ENGINE = SharedAggregatingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
ORDER BY (column_a)
SETTINGS index_granularity = 8192
  1. Execute SELECT with countMerge on the target table. This is where inconsistency happens. Sometimes this shows the results (e.g., 5 aggregated rows) properly, sometimes they don't - even after waiting for a day or two and running all the merge/sync commands above.
SELECT
    column_a,
    countMerge(counts)
FROM test_db.test_inserts
GROUP BY 
    column_a
ORDER BY 
    column_a
;

This also happens when running through Python clickhouse_connect (sending the query through client.command). The log shows 5 rows inserted successfully but nothing shows up when SELECTing.

{'read_rows': '46324168', 'read_bytes': '480616778', 'written_rows': '5', 'written_bytes': '1960', 'total_rows_to_read': '0', 'result_rows': '5', 'result_bytes': '1960', 'elapsed_ns': '2157374876', 'query_id': '<QUERY_ID>'}

My very own guess and observation is that it usually happens with queries that insert a small amount of rows. It's only my pure guess with no proof, and I'm not using Async Inserts, so I'm not sure if that relates.

Could someone please give me some advice on this? or how should I investigate this further?


Solution

  • Regarding s3Cluster table function: Yes, deduplication of to-be-inserted blocks can happen when data is ingested in parallel by multiple nodes with s3Cluster, as it can happen when data is ingested by a single node only. Each insert thread on each node is collecting up to min_insert_block_size_rows or min_insert_block_size_bytes in memory. Then this data is sorted based on the table's sorting key, then a hash is computed from that block of sorted data and compared with the hashes already existing in Keeper. If the same hash already exists, the block is discarded, otherwise the block is written to a part on disk, and the hash added to Keeper.