hadoopmapreduceaveragereducerscombiners

Can I use Combiner to compute average in a mapreduce job?


I want to implement a mapreduce job that reads parquet files with the following schema:

{
  optional int96 dropoff_datetime;
  optional float dropoff_latitude;
  optional float dropoff_longitude;
  optional int32 dropoff_taxizone_id;
  optional float ehail_fee;
  optional float extra;
  optional float fare_amount;
  optional float improvement_surcharge;
  optional float mta_tax;
  optional int32 passenger_count;
  optional binary payment_type (UTF8);
  optional int96 pickup_datetime;
  optional float pickup_latitude;
  optional float pickup_longitude;
  optional int32 pickup_taxizone_id;
  optional int32 rate_code_id;
  optional binary store_and_fwd_flag (UTF8);
  optional float tip_amount;
  optional float tolls_amount;
  optional float total_amount;
  optional float trip_distance;
  optional binary trip_type (UTF8);
  optional binary vendor_id (UTF8);
  required int64 trip_id;
}

The main object of the job is to compute the average speed of the taxi in the trips for each hour a day (0->23).

My Mapper class computes the speed for each pickup hour so it delivers the following couple (hour, speed).

The Reducer class normally should compute the average of speeds for each hour.

However I am wondering whether I can use a combiner class to facilitate the data precessing because I learned that combiner classes can only be used with commutative and associative operations and this is not the case for average right?

Any help will be appreciated.

Thanks :)


Solution

  • The combiner can assist with calculating averages. You basically want to use the combiner to give you a running total that you can use in the reducer to calculate the average.

    As input the combiner would get (hour, (speed, 1)), and as output it should produce (hour, (sum_speed, num_records)). The reducer can then calculate the average for each hour by dividing sum_speed by num_records.

    For example, if combiner 1 receives as input:

    (1, (50, 1))
    (1, (20, 1))
    (1, (10, 1))
    (23, (16, 1))
    

    then it would output:

    (1, (80, 3))
    (23, (16, 1))
    

    and if combiner 2 receives as input:

    (1, (20, 1))
    (23, (40, 1))
    

    then it would output:

    (1, (20, 1))
    (23, (40, 1))
    

    the reducer would then sum them again before dividing:

    (1, (80+20, 3+1)) = (1, (100, 4)) = (1, 25) 
    (23, (16+40, 1+1)) = (23, (56, 2)) = (23, 28)
    

    giving you your answers in the form of (hour, average_speed).