I created a simple map reduce in Python, just to test the os.environ['mapreduce_map_input_file']
call, as you can see below:
map.py
#!/usr/bin/python
import sys
# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:
l = line.strip().split()
for word in l:
# output goes to STDOUT (stream data that the program writes)
print "%s\t%d" %( word, 1 )
reduce.py
#!/usr/bin/python
import sys
import os
current_word = None
current_sum = 0
# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:
word, count = line.strip().split("\t", 1)
try:
count = int(count)
except ValueError:
continue
if word == current_word:
current_sum += count
else:
if current_word:
# output goes to STDOUT (stream data that the program writes)
print "%s\t%d" %( current_word, current_sum )
print (os.environ['mapreduce_map_input_file'])
current_word = word
current_sum = count
Error message:
Traceback (most recent call last):
File "/Users/brunomacedo/Desktop/Big-Data/./reduce.py", line 25, in <module>
print (os.environ['mapreduce_map_input_file'])
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/UserDict.py", line 23, in __getitem__
raise KeyError(key)
KeyError: 'mapreduce_map_input_file'
15/03/06 17:50:26 INFO streaming.PipeMapRed: Records R/W=16127/1
15/03/06 17:50:26 INFO streaming.PipeMapRed: MRErrorThread done
15/03/06 17:50:26 WARN streaming.PipeMapRed: java.io.IOException: Stream closed
15/03/06 17:50:26 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:128)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/06 17:50:26 WARN streaming.PipeMapRed: java.io.IOException: Stream closed
15/03/06 17:50:26 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:237)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/06 17:50:26 INFO mapred.LocalJobRunner: reduce task executor complete.
15/03/06 17:50:26 WARN mapred.LocalJobRunner: job_local1265836882_0001
java.lang.Exception: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:237)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/06 17:50:27 INFO mapreduce.Job: Job job_local1265836882_0001 failed with state FAILED due to: NA
15/03/06 17:50:27 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=181735210
FILE: Number of bytes written=292351104
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=0
HDFS: Number of bytes written=0
HDFS: Number of read operations=0
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Map-Reduce Framework
Map input records=100
Map output records=334328
Map output bytes=2758691
Map output materialized bytes=3427947
Input split bytes=14100
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=3427947
Reduce input records=0
Reduce output records=0
Spilled Records=334328
Shuffled Maps =100
Failed Shuffles=0
Merged Map outputs=100
GC time elapsed (ms)=1224
Total committed heap usage (bytes)=49956257792
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2090035
File Output Format Counters
Bytes Written=0
15/03/06 17:50:27 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
Command I use to run it:
hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
-D mapreduce.job.reduces=1 \
-file map.py \
-mapper map.py \
-file reduce.py \
-reducer reduce.py \
-input file:///Users/brunomacedo/Desktop/Big-Data/articles \
-output file:///Users/brunomacedo/Desktop/Big-Data/output
If I take out the line print (os.environ['mapreduce_map_input_file'])
it works perfectly. The purpose of this line is to print the name of the input file where the word count came from. Anyway, I did this just to test this command, because I need to use it in a more complicated project.
Can someone please help me what is wrong with this call? Thank you very much!
Edit:
I am using Hadoop 2.6.0
It didn't work when I called os.environ['mapreduce_map_input_file']
or os.environ['map_input_file']
in the reduce
file. But it works in the map
file. It makes sense, as the reducer
is not able to know from which input file your mapper
output comes from (unless you send that information directly from the mapper
).
I was unable to run just os.environ['mapreduce_map_input_file']
or os.environ['map_input_file']
directly. But using this in the mapper seems to consistently work:
try:
input_file = os.environ['mapreduce_map_input_file']
except KeyError:
input_file = os.environ['map_input_file']