I am trying to run a map reduce job on Hadoop cluster with Python mrjob and I have some trouble installing external modules like numpy, pandas,... My mrjob code:
import sys
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import TextProtocol
import numpy as np
class UserItemMatrix(MRJob):
OUTPUT_PROTOCOL = TextProtocol
def create_user_item_matrix_mapper(self, _, line):
key, value = line.strip().split("\t")
key = key.strip().split(";")
if len(key) == 1:
yield key[0], value
return
user, item = key
rating = value.strip().split(";")[0]
yield user, f"{item};{rating}"
def configure_args(self):
super(UserItemMatrix, self).configure_args()
self.add_file_arg("--items-path", help="Path to the items file")
def create_item_list(self, filename):
items = []
with open(filename, "r") as file:
for line in file:
item = line.strip() # Remove leading/trailing whitespaces and newlines
items.append(float(item))
return items
def create_user_item_matrix_reducer_init(self):
items_path = self.options.items_path
self.items = self.create_item_list(items_path)
def create_user_item_matrix_reducer(self, user, values):
values = [value.strip().split(";") for value in values]
values = np.array(values, dtype="object")
# Find rows with length 1
rows_to_remove = np.array([len(row) == 1 for row in values])
# Use boolean indexing to create a new array with rows of length 1
removed_rows = values[rows_to_remove]
avg_rating = removed_rows[0][0]
# Use boolean indexing to remove rows from the original array
coordinates = values[~rows_to_remove]
coordinates = np.vstack(coordinates).astype(float)
result = []
for item in self.items:
found = False
for user_item, rating in coordinates:
if float(user_item) == item:
result.append(f"{item};{rating}")
found = True
break
if not found:
result.append(f"{item};{avg_rating}")
result = "|".join(result)
yield user, result
def steps(self):
return [
MRStep(
mapper=self.create_user_item_matrix_mapper,
reducer_init=self.create_user_item_matrix_reducer_init,
reducer=self.create_user_item_matrix_reducer,
)
]
if __name__ == "__main__":
sys.argv[1:] = [
"./input/input_file_copy.txt",
"./clustering/output/avg_ratings.txt",
"--items-path",
"./input/items.txt",
"-r",
"hadoop",
]
UserItemMatrix().run()
My mrjob.conf file:
{
"runners": {
"hadoop": {
"setup": [
"sudo pip install numpy"
]
}
}
}
My job fails with the following output:
Using configs in /home/mackop/.mrjob.conf
Looking for hadoop binary in /home/mackop/hadoop-3.3.6/bin...
Found hadoop binary: /home/mackop/hadoop-3.3.6/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /home/mackop/hadoop-3.3.6...
Found Hadoop streaming jar: /home/mackop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/create_user_item_matrix.mackop.20240328.135901.991160
uploading working dir files to hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd...
Copying other local files to hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/
Running step 1 of 1...
packageJobJar: [/tmp/hadoop-unjar6233086433744078313/] [] /tmp/streamjob1183834584487146058.jar tmpDir=null
Connecting to ResourceManager at /0.0.0.0:8032
Connecting to ResourceManager at /0.0.0.0:8032
Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/mackop/.staging/job_1711623965898_0001
Total input files to process : 2
number of splits:3
Submitting tokens for job: job_1711623965898_0001
Executing with tokens: []
resource-types.xml not found
Unable to find 'resource-types.xml'.
Submitted application application_1711623965898_0001
The url to track the job: http://vivobook:8088/proxy/application_1711623965898_0001/
Running job: job_1711623965898_0001
Job job_1711623965898_0001 running in uber mode : false
map 0% reduce 0%
Task Id : attempt_1711623965898_0001_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Task Id : attempt_1711623965898_0001_m_000002_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
map 100% reduce 100%
Job job_1711623965898_0001 failed with state FAILED due to: Task failed task_1711623965898_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0
Job not successful!
Streaming Command Failed!
Counters: 14
Job Counters
Data-local map tasks=3
Failed map tasks=10
Killed map tasks=2
Killed reduce tasks=1
Launched map tasks=12
Other local map tasks=9
Total megabyte-milliseconds taken by all map tasks=31662080
Total time spent by all map tasks (ms)=30920
Total time spent by all maps in occupied slots (ms)=30920
Total time spent by all reduces in occupied slots (ms)=0
Total vcore-milliseconds taken by all map tasks=30920
Map-Reduce Framework
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Scanning logs for probable cause of failure...
Looking for history log in hdfs:///tmp/hadoop-yarn/staging...
Looking for history log in /home/mackop/hadoop-3.3.6/logs...
Looking for task syslogs in /home/mackop/hadoop-3.3.6/logs/userlogs/application_1711623965898_0001...
Probable cause of failure:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Step 1 of 1 failed: Command '['/home/mackop/hadoop-3.3.6/bin/hadoop', 'jar', '/home/mackop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar', '-files', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/create_user_item_matrix.py#create_user_item_matrix.py,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/items.txt#items.txt,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/mrjob.zip#mrjob.zip,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/setup-wrapper.sh#setup-wrapper.sh', '-input', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/input_file_copy.txt', '-input', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/avg_ratings.txt', '-output', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/output', '-mapper', '/bin/sh -ex setup-wrapper.sh python3 create_user_item_matrix.py --step-num=0 --mapper --items-path items.txt', '-reducer', '/bin/sh -ex setup-wrapper.sh python3 create_user_item_matrix.py --step-num=0 --reducer --items-path items.txt']' returned non-zero exit status 256.
The job works fine when I run it inline. The error only appears when I run it with Hadoop cluster. I think the problem is that it can't import numpy.
Firstly, next time please provide the log of attempting task (by click to the task logs).
Next one, I think the problem is that is from the sudo
which make you cannot type the password.
Moreover, I suggest that you should use the venv, rather than directly install the library. The setup file should be:
runners:
hadoop:
setup:
- 'set -e'
- VENV=/tmp/$mapreduce_job_id
- if [ ! -e $VENV ]; then virtualenv $VENV; fi
- . $VENV/bin/activate
- 'pip install numpy'
Remember that the venv should be installed with root user. Please refer to this thread also.