mapreducehadoop-streaming

os.environ['mapreduce_map_input_file'] doesn't work


I created a simple map reduce in Python, just to test the os.environ['mapreduce_map_input_file'] call, as you can see below:

map.py

#!/usr/bin/python
import sys

# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:

    l = line.strip().split()

    for word in l:

        # output goes to STDOUT (stream data that the program writes)
        print "%s\t%d" %( word, 1 )

reduce.py

#!/usr/bin/python
import sys
import os

current_word = None
current_sum = 0

# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:

    word, count = line.strip().split("\t", 1)

    try:
        count = int(count)
        except ValueError:
        continue

    if word == current_word:
        current_sum += count
    else:
            if current_word:
            # output goes to STDOUT (stream data that the program writes)
            print "%s\t%d" %( current_word, current_sum )
            print (os.environ['mapreduce_map_input_file'])
        current_word = word
        current_sum = count

Error message:

Traceback (most recent call last):
  File "/Users/brunomacedo/Desktop/Big-Data/./reduce.py", line 25, in <module>
    print (os.environ['mapreduce_map_input_file'])
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/UserDict.py", line 23, in __getitem__
    raise KeyError(key)
KeyError: 'mapreduce_map_input_file'
15/03/06 17:50:26 INFO streaming.PipeMapRed: Records R/W=16127/1
15/03/06 17:50:26 INFO streaming.PipeMapRed: MRErrorThread done
15/03/06 17:50:26 WARN streaming.PipeMapRed: java.io.IOException: Stream closed
15/03/06 17:50:26 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:128)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/06 17:50:26 WARN streaming.PipeMapRed: java.io.IOException: Stream closed
15/03/06 17:50:26 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
    at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:237)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/06 17:50:26 INFO mapred.LocalJobRunner: reduce task executor complete.
15/03/06 17:50:26 WARN mapred.LocalJobRunner: job_local1265836882_0001
java.lang.Exception: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
    at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:237)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/06 17:50:27 INFO mapreduce.Job: Job job_local1265836882_0001 failed with state FAILED due to: NA
15/03/06 17:50:27 INFO mapreduce.Job: Counters: 35
    File System Counters
        FILE: Number of bytes read=181735210
        FILE: Number of bytes written=292351104
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=0
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=0
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Map-Reduce Framework
        Map input records=100
        Map output records=334328
        Map output bytes=2758691
        Map output materialized bytes=3427947
        Input split bytes=14100
        Combine input records=0
        Combine output records=0
        Reduce input groups=0
        Reduce shuffle bytes=3427947
        Reduce input records=0
        Reduce output records=0
        Spilled Records=334328
        Shuffled Maps =100
        Failed Shuffles=0
        Merged Map outputs=100
        GC time elapsed (ms)=1224
        Total committed heap usage (bytes)=49956257792
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=2090035
    File Output Format Counters
        Bytes Written=0
15/03/06 17:50:27 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

Command I use to run it:

hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
-D mapreduce.job.reduces=1 \
-file map.py \
-mapper map.py \
-file reduce.py \
-reducer reduce.py \
-input file:///Users/brunomacedo/Desktop/Big-Data/articles \
-output file:///Users/brunomacedo/Desktop/Big-Data/output

If I take out the line print (os.environ['mapreduce_map_input_file']) it works perfectly. The purpose of this line is to print the name of the input file where the word count came from. Anyway, I did this just to test this command, because I need to use it in a more complicated project.

Can someone please help me what is wrong with this call? Thank you very much!

Edit:

I am using Hadoop 2.6.0


Solution

  • It didn't work when I called os.environ['mapreduce_map_input_file'] or os.environ['map_input_file'] in the reduce file. But it works in the map file. It makes sense, as the reducer is not able to know from which input file your mapper output comes from (unless you send that information directly from the mapper).

    I was unable to run just os.environ['mapreduce_map_input_file'] or os.environ['map_input_file'] directly. But using this in the mapper seems to consistently work:

    try:
        input_file = os.environ['mapreduce_map_input_file']
    except KeyError:
        input_file = os.environ['map_input_file']