I'm working on a slightly improved version of famous WordCount program, which should output what percentage of the book does the word take. For example:
...
war 0.00002332423%
peace 0.0034234324%
...
Basically, I need to count all the words, count occurence of each of them, divide this set of values by total count. So there should be at least two jobs:
Job1
input
directory and produces two output directories: output1
and output2
(word, 1)
to output1
, write pairs ("total_count", 1)
to output2
(word, n)
in output1
, calculate total count to make ("total_count", N)
in output2
Job2
output1
and output2
as input folder, writes the results to output3
total_count
, writes the result to output3
My problems:
I'd like to avoid going through original input twice, that is why I'm trying to calculate both word count and total count in Job1. But I do not understand how to avoid mixing up the results in one output. I have tried to use MultipleOutputs
but in this case the results of the mapper do not get into reducer.
Job2 requires multiple inputs and moreover it needs to read output2
first, because without total count it is useless to read the results from output1
. I feel that this is the wrong way of working with MapReduce (we should not use any kinds of synchronization) but do not see the correct one.
Mapper in Job2 does nothing useful and will just waste processor time.
Just a thought on using a single Job:
total_count
can be calculated from the map phase of the first job. Actually, it is already counted as MAP_OUTPUT_RECORDS
. This is the sum of all the map output (key, value)
pairs. So, if you always have 1 as value, then this sum is what you want, i.e. the total number of words in your document (with repetition).
Now, I don't know if you can get this counter in the configuration of the reducers. Then, you could just output for each word the pair (word, wordCount/MAP_OUTPUT_RECORDS)
. I think you can do this through:
New API:
context.getCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue();
Old API:
reporter.getCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue();