I want to solve the word count problem and want to get the results in reverse sorted order according to the frequency of occurrence in the file.
Following are the four files (2 mappers and 2 reducers, as one Map Reduce job cannot solve this problem) I wrote for this purpose:
1) mapper1.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split('\t', 1)
except ValueError as e:
continue
words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
for word in words:
print "%s\t%d" % (word.lower(), 1)
2) reducer1.py
import sys
current_key = None
word_sum = 0
for line in sys.stdin:
try:
key, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print "%s\t%d" % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count
if current_key:
print "%s\t%d" % (current_key, word_sum)
3) mapper2.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
4) reducer2.py
import sys
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
The following are the two yarn commands run by me in a bash environment
OUT_DIR="wordcount_result_1"
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount" \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper1.py,reducer1.py \
-mapper "python mapper1.py" \
-combiner "python reducer1.py" \
-reducer "python reducer1.py" \
-input /test/articles-part-short \
-output ${OUT_DIR} > /dev/null
OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount Rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D map.output.key.field.separator=\t \
-D mapreduce.partition.keycomparator.options=-k2,2nr \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input ${OUT_DIR} \
-output ${OUT_DIR_2} > /dev/null
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head
This is not giving me the right answer. Can someone please explain where did it go wrong?
On the other hand,
in the mapper2.py
if I print in the following manner,
print "%d\t%s" % (count, word)
and in the reducer2.py
if I read in the following manner,
count, word = line.strip().split('\t', 1)
and edit the 2nd yarn command option to
-D mapreduce.partition.keycomparator.options=-k1,1nr
it gives me right answer.
Why is it behaving differently in both of the above cases?
Can someone please help me understand the Comparator options of Hadoop MapReduce?
This will work
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k2nr' \
-D stream.num.map.output.key.fields=2 \
-D mapred.map.tasks=1 \
-D mapreduce.job.reduces=1 \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input /user/jovyan/assignment0_1563877099149160 \
-output ${OUT_DIR} > /dev/null