distributedclickhouse

Clickhouse Distributed Query take huge amount of network usage when using group by


Describe the unexpected behaviour

the following SQL execution will bump network usage to 500Mb/s, and take approximately 2 second on network transmission:

SELECT
    user_id,
    count() AS c
FROM semanticdb_chatbi.I11066 AS test
GROUP BY user_id
ORDER BY c DESC
LIMIT 10

How to reproduce

22.3.20.29

4 nodes with 4 shards, 1 replica

  1. local table:
CREATE TABLE semanticdb_chatbi.I11066_local ON CLUSTER '{cluster}'
(
    `statis_date` DateTime COMMENT 'date',
    `user_id` String COMMENT 'ID',
    `fee` Nullable(Float64) COMMENT '收入'
)
ENGINE = ReplicatedMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/semanticdb_chatbi/I11066_local', '{replica}')
PARTITION BY toYYYYMM(statis_date)
ORDER BY statis_date
SETTINGS index_granularity = 8192 

2, distributed table

 CREATE TABLE semanticdb_chatbi.I11066
(
    `statis_date` DateTime COMMENT 'date',
    `user_id` String COMMENT 'ID',
    `fee` Nullable(Float64) COMMENT '收入'
)
ENGINE = Distributed('{cluster}', 'semanticdb_chatbi', 'I11066_local', cityHash64(user_id))

600Million

query log in json

{
        "meta":
        [
                {
                        "name": "type",
                        "type": "Enum8('QueryStart' = 1, 'QueryFinish' = 2, 'ExceptionBeforeStart' = 3, 'ExceptionWhileProcessing' = 4)"
                },
                {
                        "name": "event_date",
                        "type": "Date"
                },
                {
                        "name": "event_time",
                        "type": "DateTime"
                },
                {
                        "name": "event_time_microseconds",
                        "type": "DateTime64(6)"
                },
                {
                        "name": "query_start_time",
                        "type": "DateTime"
                },
                {
                        "name": "query_start_time_microseconds",
                        "type": "DateTime64(6)"
                },
                {
                        "name": "query_duration_ms",
                        "type": "UInt64"
                },
                {
                        "name": "read_rows",
                        "type": "UInt64"
                },
                {
                        "name": "read_bytes",
                        "type": "UInt64"
                },
                {
                        "name": "written_rows",
                        "type": "UInt64"
                },
                {
                        "name": "written_bytes",
                        "type": "UInt64"
                },
                {
                        "name": "result_rows",
                        "type": "UInt64"
                },
                {
                        "name": "result_bytes",
                        "type": "UInt64"
                },
                {
                        "name": "memory_usage",
                        "type": "UInt64"
                },
                {
                        "name": "current_database",
                        "type": "String"
                },
                {
                        "name": "query",
                        "type": "String"
                },
                {
                        "name": "formatted_query",
                        "type": "String"
                },
                {
                        "name": "normalized_query_hash",
                        "type": "UInt64"
                },
                {
                        "name": "query_kind",
                        "type": "LowCardinality(String)"
                },
                {
                        "name": "databases",
                        "type": "Array(LowCardinality(String))"
                },
                {
                        "name": "tables",
                        "type": "Array(LowCardinality(String))"
                },
                {
                        "name": "columns",
                        "type": "Array(LowCardinality(String))"
                },
                {
                        "name": "projections",
                        "type": "Array(LowCardinality(String))"
                },
                {
                        "name": "views",
                        "type": "Array(LowCardinality(String))"
                },
                {
                        "name": "exception_code",
                        "type": "Int32"
                },
                {
                        "name": "exception",
                        "type": "String"
                },
                {
                        "name": "stack_trace",
                        "type": "String"
                },
                {
                        "name": "is_initial_query",
                        "type": "UInt8"
                },
                {
                        "name": "user",
                        "type": "String"
                },
                {
                        "name": "query_id",
                        "type": "String"
                },
                {
                        "name": "address",
                        "type": "IPv6"
                },
                {
                        "name": "port",
                        "type": "UInt16"
                },
                {
                        "name": "initial_user",
                        "type": "String"
                },
                {
                        "name": "initial_query_id",
                        "type": "String"
                },
                {
                        "name": "initial_address",
                        "type": "IPv6"
                },
                {
                        "name": "initial_port",
                        "type": "UInt16"
                },
                {
                        "name": "initial_query_start_time",
                        "type": "DateTime"
                },
                {
                        "name": "initial_query_start_time_microseconds",
                        "type": "DateTime64(6)"
                },
                {
                        "name": "interface",
                        "type": "UInt8"
                },
                {
                        "name": "os_user",
                        "type": "String"
                },
                {
                        "name": "client_hostname",
                        "type": "String"
                },
                {
                        "name": "client_name",
                        "type": "String"
                },
                {
                        "name": "client_revision",
                        "type": "UInt32"
                },
                {
                        "name": "client_version_major",
                        "type": "UInt32"
                },
                {
                        "name": "client_version_minor",
                        "type": "UInt32"
                },
                {
                        "name": "client_version_patch",
                        "type": "UInt32"
                },
                {
                        "name": "http_method",
                        "type": "UInt8"
                },
                {
                        "name": "http_user_agent",
                        "type": "String"
                },
                {
                        "name": "http_referer",
                        "type": "String"
                },
                {
                        "name": "forwarded_for",
                        "type": "String"
                },
                {
                        "name": "quota_key",
                        "type": "String"
                },
                {
                        "name": "distributed_depth",
                        "type": "UInt64"
                },
                {
                        "name": "revision",
                        "type": "UInt32"
                },
                {
                        "name": "log_comment",
                        "type": "String"
                },
                {
                        "name": "thread_ids",
                        "type": "Array(UInt64)"
                },
                {
                        "name": "ProfileEvents",
                        "type": "Map(String, UInt64)"
                },
                {
                        "name": "Settings",
                        "type": "Map(String, String)"
                },
                {
                        "name": "used_aggregate_functions",
                        "type": "Array(String)"
                },
                {
                        "name": "used_aggregate_function_combinators",
                        "type": "Array(String)"
                },
                {
                        "name": "used_database_engines",
                        "type": "Array(String)"
                },
                {
                        "name": "used_data_type_families",
                        "type": "Array(String)"
                },
                {
                        "name": "used_dictionaries",
                        "type": "Array(String)"
                },
                {
                        "name": "used_formats",
                        "type": "Array(String)"
                },
                {
                        "name": "used_functions",
                        "type": "Array(String)"
                },
                {
                        "name": "used_storages",
                        "type": "Array(String)"
                },
                {
                        "name": "used_table_functions",
                        "type": "Array(String)"
                }
        ],

        "data":
        [
                {
                        "type": "QueryFinish",
                        "event_date": "2024-03-09",
                        "event_time": "2024-03-09 15:39:15",
                        "event_time_microseconds": "2024-03-09 15:39:15.535252",
                        "query_start_time": "2024-03-09 15:39:11",
                        "query_start_time_microseconds": "2024-03-09 15:39:11.698048",
                        "query_duration_ms": "3836",
                        "read_rows": "642330674",
                        "read_bytes": "14131246634",
                        "written_rows": "0",
                        "written_bytes": "0",
                        "result_rows": "10",
                        "result_bytes": "512",
                        "memory_usage": "89259629",
                        "current_database": "semanticdb_chatbi",
                        "query": "SELECT\n    user_id,\n    count() AS c\nFROM semanticdb_chatbi.I11066 AS test\nGROUP BY user_id\nORDER BY c DESC\nLIMIT 10",
                        "formatted_query": "",
                        "normalized_query_hash": "10840967859924656691",
                        "query_kind": "Select",
                        "databases": ["semanticdb_chatbi"],
                        "tables": ["semanticdb_chatbi.I11066"],
                        "columns": ["semanticdb_chatbi.I11066.user_id"],
                        "projections": [],
                        "views": [],
                        "exception_code": 0,
                        "exception": "",
                        "stack_trace": "",
                        "is_initial_query": 1,
                        "user": "default",
                        "query_id": "f238bb70-a22b-4294-aec4-138a4705e025",
                        "address": "::ffff:127.0.0.1",
                        "port": 45112,
                        "initial_user": "default",
                        "initial_query_id": "f238bb70-a22b-4294-aec4-138a4705e025",
                        "initial_address": "::ffff:127.0.0.1",
                        "initial_port": 45112,
                        "initial_query_start_time": "2024-03-09 15:39:11",
                        "initial_query_start_time_microseconds": "2024-03-09 15:39:11.698048",
                        "interface": 1,
                        "os_user": "",
                        "client_hostname": "chi-clickhouse-local-storage-local-storage-2-0-0.chi-clickhouse-local-storage-local-storage-2-0.chatbi.svc.cluster.local",
                        "client_name": "ClickHouse ",
                        "client_revision": 54455,
                        "client_version_major": 22,
                        "client_version_minor": 3,
                        "client_version_patch": 20,
                        "http_method": 0,
                        "http_user_agent": "",
                        "http_referer": "",
                        "forwarded_for": "",
                        "quota_key": "",
                        "distributed_depth": "0",
                        "revision": 54460,
                        "log_comment": "",
                        "thread_ids": ["16904","16635","15904","13174","16560","16255","16430","16472","16514","16518","15691","16188","12493","9694","15323","16451","16013","9136","12298","16125","16384","16730","13895","16574","16571","16067","15088","14924","16176","14574","14369","15872","9126","15618"],
                        "ProfileEvents": {"Query":"1","SelectQuery":"1","ReadCompressedBytes":"43069298","CompressedReadBufferBlocks":"774","CompressedReadBufferBytes":"59988459","IOBufferAllocs":"6","IOBufferAllocBytes":"3389588","ArenaAllocChunks":"5888","ArenaAllocBytes":"196083712","NetworkReceiveElapsedMicroseconds":"72311","NetworkSendElapsedMicroseconds":"5572","NetworkReceiveBytes":"68305516","NetworkSendBytes":"5749097","SelectedRows":"642330674","SelectedBytes":"14131246634","ContextLock":"3712","RWLockAcquiredReadLocks":"2","RealTimeMicroseconds":"130211245","UserTimeMicroseconds":"1796646","SystemTimeMicroseconds":"183466","SoftPageFaults":"1153","OSCPUWaitMicroseconds":"746687","OSCPUVirtualTimeMicroseconds":"1979942","OSWriteBytes":"61440","OSReadChars":"56158","OSWriteChars":"136752","QueryProfilerRuns":"133"},
                        "Settings": {"connect_timeout_with_failover_ms":"1000","load_balancing":"nearest_hostname","distributed_aggregation_memory_efficient":"1","do_not_merge_across_partitions_select_final":"1","os_thread_priority":"2","log_queries":"1","max_memory_usage":"10000000000","prefer_localhost_replica":"0","parallel_view_processing":"1"},
                        "used_aggregate_functions": ["count"],
                        "used_aggregate_function_combinators": [],
                        "used_database_engines": [],
                        "used_data_type_families": ["Int64","Enum8","UInt64","DateTime","AggregateFunction","String"],
                        "used_dictionaries": [],
                        "used_formats": [],
                        "used_functions": [],
                        "used_storages": [],
                        "used_table_functions": []
                }

        ],

        "rows": 1,

        "rows_before_limit_at_least": 1,

        "statistics":
        {
                "elapsed": 0.044576104,
                "rows_read": 47068,
                "bytes_read": 13481440
        }
}

Expected behaviour

I think CK should first calculate the top 10 c on each node locally and then merge the 4 * 10 results globally. so i think there is only 40 rows need to be sent across the cluster.

More Infomation

i thought maybe CK will not limit rows locally. so i've tried group by user gender without limit clause, it result only 2 rows on each node after group by. the network usage dropped, but still at a very high level: 20Mb/s

What I need

  1. i'd like to know why the statement take so many network transmission
  2. and how to optimize the query

thanks


Solution

  • Merging top 10 groups from each node.
    Note that it's logically the same as getting top 10 groups from the whole table because of group by the same column used in the distribution key expression.

    select
      user_id
    , sum(c) as cnt
    from cluster
    (
     '{cluster}'
    , view
    (
    SELECT
        user_id,
        count() AS c
    FROM semanticdb_chatbi.I11066_local
    GROUP BY user_id
    ORDER BY c DESC
    LIMIT 10
    )
    )
    group by user_id
    order by cnt desc
    limit 10