pythonpython-3.xhadoopmapreducehadoop-streaming

Not getting my expected output in mapreduce using python code


Running this code to take Probability in Hadoop cluster my data in CSV File around 10k+. I am using Google DataProc Cluster to run this code. kindly tell me how can I get my expected output. and the last thing maybe there is a logical issue or maybe some function issue.

#!/usr/bin/env python3
"""mapper.py"""
import sys

# Get input lines from stdin
for line in sys.stdin:
    # Remove spaces from beginning and end of the line
    line = line.strip()

    # Split it into tokens
    #tokens = line.split()

    #Get probability_mass values
    for probability_mass in line:
        print("None\t{}".format(probability_mass))
#!/usr/bin/env python3
"""reducer.py"""
import sys
from collections import defaultdict


counts = defaultdict(int)

# Get input from stdin
for line in sys.stdin:
    #Remove spaces from beginning and end of the line
    line = line.strip()

    # skip empty lines
    if not line:
        continue  

    # parse the input from mapper.py
    k,v = line.split('\t', 1)
    counts[v] += 1

total = sum(counts.values())
probability_mass = {k:v/total for k,v in counts.items()}
print(probability_mass)

My CSV File look like this.

probability_mass
10
10
60
10
30
Expected output Probability of each number

{10: 0.6, 60: 0.2, 30: 0.2}

but result still show like this 
{1:0} {0:0} {3:0} {6:0} {1:0} {6:0}

I will save this command in nano and then run this.

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options=-n \
-files mapper.py,reducer.py \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-input /tmp/data.csv \
-output /tmp/output

Solution

  • You are splitting the line into individual characters, which would explain why you get 1, 3, 6, 0, etc as the map keys.

    Don't loop, just print the line for the value; your mapper doesn't need to be more than this

    import sys
    for line in sys.stdin:
        print("None\t{}".format(line.strip()))
    

    Then, in the reducer, you're dividing an int by a larger int, which results in rounding down to the closest int, which is 0.

    You can fix this by changing the dict to store floats

    counts = defaultdict(float)
    

    or make the sum a float

    total = float(sum(counts.values()))
    

    As mentioned before, this isn't a Hadoop problem, since you can test and debug it all locally

    cat data.txt | python mapper.py | sort -n | python reducer.py