I'm having some difficulties to make sure I'm leveraging sorted data within a Hive table. (Using ORC file format)
I understand we can affect how the data is read from a Hive table, by declaring a DISTRIBUTE BY
clause in the create DDL.
CREATE TABLE trades
(
trade_id INT,
name STRING,
contract_type STRING,
ts INT
)
PARTITIONED BY (dt STRING)
CLUSTERED BY (trade_id) SORTED BY (trade_id, time) INTO 8 BUCKETS
STORED AS ORC;
This will mean that every time I make a query to this table, the data will be distributed by trade_id
among the various mappers and afterward it will be sorted.
My question is:
I do not want the data to be split into N
files (buckets), because the volume is not that much and I would stay with small files.
However, I do want to leverage sorted insertion.
INSERT OVERWRITE TABLE trades
PARTITION (dt)
SELECT trade_id, name, contract_type, ts, dt
FROM raw_trades
DISTRIBUTE BY trade_id
SORT BY trade_id;
Do I really need to use CLUSTERED/SORT
in the create DLL statement? Or does Hive/ORC knows how to leverage the fact that the insertion process already ensured that the data is sorted?
Could it make sense to do something like:
CLUSTERED BY (trade_id) SORTED BY (trade_id, time) INTO 1 BUCKETS
Bucketed table is an outdated concept.
You do not need to write CLUSTERED BY in table DDL.
When loading table use distribute by partition key
to reduce pressure on reducers especially when writing ORC, which requires intermediate buffers for building ORC and if each reducer loads many partitions it may cause OOM exception.
When the table is big, you can limit the max file size using bytes.per.reducer like this:
set hive.exec.reducers.bytes.per.reducer=67108864;--or even less
If you have more data, more reducers will be started, more files created. This is more flexible than loading fixed number of buckets.
This will also work better because for small tables you do not need to create smaller buckets.
ORC has internal indexes and bloom filters. Applying SORT you can improve index and bloom filters efficiency because all similar data will be stored together. Also this can improve compression depending on your data enthropy.
If distribution by partition key is not enough because you have some data skew and the data is big, you can additionally distribute by random. It is better to distribute by column if you have evenly distributed data. If not, distribute by random to avoid single long running reducer problem.
Finally your insert statement may look loke this:
set hive.exec.reducers.bytes.per.reducer=33554432; --32Mb per reducer
INSERT OVERWRITE TABLE trades PARTITION (dt)
SELECT trade_id, name, contract_type, ts, dt
FROM raw_trades
DISTRIBUTE BY dt, --partition key is a must for big data
trade_id, --some other key if the data is too big and key is
--evenly distributed (no skew)
FLOOR(RAND()*100.0)%20 --random to distribute additionally on 20 equal parts
SORT BY contract_type; --sort data if you want filtering by this key
--to work better using internal index
Do not use CLUSTERED BY in table DDL because using DISTRIBUTE BY, ORC w indexes and bloom filters + SORT during insert you can achieve the same in more flexible way.
Distribute + sort can reduce the size of ORC files extremely by x3 or x4 times. Similar data can be better compressed and makes internal indexes more efficient.
Read also this: https://stackoverflow.com/a/55375261/2700344 This is related answer about about sorting: https://stackoverflow.com/a/47416027/2700344
The only case when you can use CLUSTER BY in table DDL is when you joining two big tables which can be bucketed by exactly the same number of buckets to be able to use sort-merge-bucket-map-join, but practically it is so rare case when you can bucket two big tables in the same way. Having only 1 bucket makes no sense because for small tables you can use map-join, just sort the data during insert to reduce the compressed data size.