pythonmrjob

How to install Python modules like numpy, pandas when run MapReduce code written with mrjob in Hadoop cluster?


I am trying to run a map reduce job on Hadoop cluster with Python mrjob and I have some trouble installing external modules like numpy, pandas,... My mrjob code:

import sys

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import TextProtocol
import numpy as np


class UserItemMatrix(MRJob):
    OUTPUT_PROTOCOL = TextProtocol

    def create_user_item_matrix_mapper(self, _, line):
        key, value = line.strip().split("\t")
        key = key.strip().split(";")
        if len(key) == 1:
            yield key[0], value
            return
        user, item = key
        rating = value.strip().split(";")[0]

        yield user, f"{item};{rating}"

    def configure_args(self):
        super(UserItemMatrix, self).configure_args()
        self.add_file_arg("--items-path", help="Path to the items file")

    def create_item_list(self, filename):
        items = []
        with open(filename, "r") as file:
            for line in file:
                item = line.strip()  # Remove leading/trailing whitespaces and newlines
                items.append(float(item))
        return items

    def create_user_item_matrix_reducer_init(self):
        items_path = self.options.items_path
        self.items = self.create_item_list(items_path)

    def create_user_item_matrix_reducer(self, user, values):
        values = [value.strip().split(";") for value in values]
        values = np.array(values, dtype="object")
        # Find rows with length 1
        rows_to_remove = np.array([len(row) == 1 for row in values])

        # Use boolean indexing to create a new array with rows of length 1
        removed_rows = values[rows_to_remove]
        avg_rating = removed_rows[0][0]

        # Use boolean indexing to remove rows from the original array
        coordinates = values[~rows_to_remove]
        coordinates = np.vstack(coordinates).astype(float)

        result = []
        for item in self.items:
            found = False
            for user_item, rating in coordinates:
                if float(user_item) == item:
                    result.append(f"{item};{rating}")
                    found = True
                    break
            if not found:
                result.append(f"{item};{avg_rating}")
        result = "|".join(result)
        yield user, result

    def steps(self):
        return [
            MRStep(
                mapper=self.create_user_item_matrix_mapper,
                reducer_init=self.create_user_item_matrix_reducer_init,
                reducer=self.create_user_item_matrix_reducer,
            )
        ]


if __name__ == "__main__":
    sys.argv[1:] = [
        "./input/input_file_copy.txt",
        "./clustering/output/avg_ratings.txt",
        "--items-path",
        "./input/items.txt",
        "-r",
        "hadoop",
    ]
    UserItemMatrix().run()

My mrjob.conf file:

{
  "runners": {
    "hadoop": {
      "setup": [
        "sudo pip install numpy"
      ]
    }
  }
}

My job fails with the following output:

Using configs in /home/mackop/.mrjob.conf
Looking for hadoop binary in /home/mackop/hadoop-3.3.6/bin...
Found hadoop binary: /home/mackop/hadoop-3.3.6/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /home/mackop/hadoop-3.3.6...
Found Hadoop streaming jar: /home/mackop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/create_user_item_matrix.mackop.20240328.135901.991160
uploading working dir files to hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd...
Copying other local files to hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar6233086433744078313/] [] /tmp/streamjob1183834584487146058.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/mackop/.staging/job_1711623965898_0001
  Total input files to process : 2
  number of splits:3
  Submitting tokens for job: job_1711623965898_0001
  Executing with tokens: []
  resource-types.xml not found
  Unable to find 'resource-types.xml'.
  Submitted application application_1711623965898_0001
  The url to track the job: http://vivobook:8088/proxy/application_1711623965898_0001/
  Running job: job_1711623965898_0001
  Job job_1711623965898_0001 running in uber mode : false
   map 0% reduce 0%
  Task Id : attempt_1711623965898_0001_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000002_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

   map 100% reduce 100%
  Job job_1711623965898_0001 failed with state FAILED due to: Task failed task_1711623965898_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0

  Job not successful!
  Streaming Command Failed!
Counters: 14
        Job Counters 
                Data-local map tasks=3
                Failed map tasks=10
                Killed map tasks=2
                Killed reduce tasks=1
                Launched map tasks=12
                Other local map tasks=9
                Total megabyte-milliseconds taken by all map tasks=31662080
                Total time spent by all map tasks (ms)=30920
                Total time spent by all maps in occupied slots (ms)=30920
                Total time spent by all reduces in occupied slots (ms)=0
                Total vcore-milliseconds taken by all map tasks=30920
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
Scanning logs for probable cause of failure...
Looking for history log in hdfs:///tmp/hadoop-yarn/staging...
Looking for history log in /home/mackop/hadoop-3.3.6/logs...
Looking for task syslogs in /home/mackop/hadoop-3.3.6/logs/userlogs/application_1711623965898_0001...

Probable cause of failure:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)


Step 1 of 1 failed: Command '['/home/mackop/hadoop-3.3.6/bin/hadoop', 'jar', '/home/mackop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar', '-files', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/create_user_item_matrix.py#create_user_item_matrix.py,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/items.txt#items.txt,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/mrjob.zip#mrjob.zip,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/setup-wrapper.sh#setup-wrapper.sh', '-input', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/input_file_copy.txt', '-input', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/avg_ratings.txt', '-output', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/output', '-mapper', '/bin/sh -ex setup-wrapper.sh python3 create_user_item_matrix.py --step-num=0 --mapper --items-path items.txt', '-reducer', '/bin/sh -ex setup-wrapper.sh python3 create_user_item_matrix.py --step-num=0 --reducer --items-path items.txt']' returned non-zero exit status 256.

The job works fine when I run it inline. The error only appears when I run it with Hadoop cluster. I think the problem is that it can't import numpy.


Solution

  • Firstly, next time please provide the log of attempting task (by click to the task logs).

    Next one, I think the problem is that is from the sudo which make you cannot type the password.

    Moreover, I suggest that you should use the venv, rather than directly install the library. The setup file should be:

    runners:
       hadoop:
           setup:
           - 'set -e'
           - VENV=/tmp/$mapreduce_job_id
           - if [ ! -e $VENV ]; then virtualenv $VENV; fi
           - . $VENV/bin/activate
           - 'pip install numpy'
    

    Remember that the venv should be installed with root user. Please refer to this thread also.