I have been having issues with getting a pyspark job to run on an EMR cluster, so I logged into the master node and ran spark-submit directly there
I have a python file that I submit to pyspark and in this file I have:
import subprocess
from pyspark import SparkContext, SparkConf
import boto3
from boto3.s3.transfer import S3Transfer
import os, re
import tarfile
import time
...
When I try to run this in cluster mode, I get: (from yarn logs, trimmed for brevity)
16/01/31 21:45:57 INFO spark.CacheManager: Partition rdd_2_0 not found, computing it
16/01/31 21:45:57 INFO spark.CacheManager: Partition rdd_1_0 not found, computing it
16/01/31 21:45:57 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1454273602144_0005/container_1454273602144_0005_01_000002/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1454273602144_0005/container_1454273602144_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1454273602144_0005/container_1454273602144_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named boto3.s3.transfer
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Later on I get errors about being unable to import boto3.
If I run in client mode, I only get the ImportError about boto3.s3.transfer.
Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-39-79.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1454273602144_0005/container_1454273602144_0005_01_000002/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1454273602144_0005/container_1454273602144_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1454273602144_0005/container_1454273602144_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named boto3.s3.transfer
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
However, if I check pip freeze:
boto3==1.2.3
botocore==1.3.23
If I open the Spark Shell on the master and do this:
import boto3
client = boto3.client("s3")
It works fine.
Is there some sort of virtual environment thing going on here? I'm totally confused.
Edit Forgot to mention that I am using the latest EMR version with Spark 1.6.0.
Also, this works fine on my own machine in local mode.
Well, derp, I found the issue.
Turns out I had to pip install boto3
, EMR nodes do not get this installed by default.
This is one case where the error is quite descriptive.