pythonhadoopmapreducemrjob

Yield both max and min in a single mapreduce


I am a beginner just getting started with writing MapReduce programs in Python using MRJob library.

One of the example worked out in the video tutorial is to find a max temperature by location_id. Following on from that writing another program to find the min temperature by location_id is straightforward too.

I am wondering, is there a way to yield both max and min temperature by location_id in a single mapreduce program?. Below is my go at it:

from mrjob.job import MRJob

'''Sample Data
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E, 

Output I am expecting to see:
ITE00100554  32.3  20.2
EZE00100082  34.4  19.6
'''

class MaxMinTemperature(MRJob):
    def mapper(self, _, line):
        location, datetime, measure, temperature, w, x, y, z = line.split(',')
        temperature = float(temperature)/10
        if measure == 'TMAX' or measure == 'TMIN':
            yield location, temperature

    def reducer(self, location, temperatures):
        yield location, max(temperatures), min(temperatures)


if __name__ == '__main__':
    MaxMinTemperature.run()

I get the following error:

File "MaxMinTemperature.py", line 12, in reducer
yield location, max(temperatures), min(temperatures)
ValueError: min() arg is an empty sequence

Is this possible?

Thank you for your assistance.

Shiv


Solution

  • You have two problems in reducer:

    1. If you check type of the temperature argument, you will see that it's a generator. A generator can be traversed only once so you cannot pass the same generator to both 'min' and 'max' functions. The right solution is to manually traverse it. A wrong solution - converting it to a list - may cause out of memory error on big enough input because a list holds all its elements in memory and a generator does not.

    2. Result of reducer must be a two-elements tuple. So you need to combine your min and max temperature in another tuple.

    Complete working solution:

    class MaxMinTemperature(MRJob):
        def mapper(self, _, line):
            location, datetime, measure, temperature, w, x, y, z = line.split(',')
            temperature = float(temperature)/10
            if measure in ('TMAX', 'TMIN'):
                yield location, temperature
    
        def reducer(self, location, temperatures):
            min_temp = next(temperatures)
            max_temp = min_temp
            for item in temperatures:
                min_temp = min(item, min_temp)
                max_temp = max(item, max_temp)
            yield location, (min_temp, max_temp)