hadoophivemapreducehiveqlhadoop-partitioning

How map reduce is being performed in this HiveQL query?


FROM (
  FROM pv_users
  SELECT TRANSFORM(pv_users.userid, pv_users.date)
  USING 'python mapper.py'
  AS dt, uid
  CLUSTER BY dt) map_output
INSERT OVERWRITE TABLE pv_users_reduced
SELECT TRANSFORM map_output.dt, map_output.uid
  USING 'python reducer.py'
  AS date, count;

How map reduce is working in this query and what is the significance of "CLUSTER BY" in this query?


Solution

  • Each mapper will read file splits, do something with their splits (for example pre-aggregation like distinct) and produce dt, uid grouped and sorted by dt, so different dt will be put in different files which will be consumed by reducers on the next step.

    Reducers will read files prepared by mappers, so records with the same dt will be read by the same reducer because records were distributed by dt and sorted on mapper. Reducer will merge partial results(files from mappers) and do some count aggregation. If some dt were in the same file, records are sorted, it reduces the amount of work to be done on reducer.

    cluster by dt = distribute by dt sort by dt

    Without cluster by, two reducers may receive same dt, this will make impossible to perform count correctly because reducers do not know about each other and do not share data between them, same dt will be counted partially on different reducers, final result will contain multiple records with the same dt