I'm trying to create a cluster on EMR on EC2 Clusters to process my jobs I have created a spark cluster with default VPC
I have checked my IAM and I've allowed "s3:CreateBucket", "s3:DeleteObject", "s3:ListBucket", on my AWS S3 bucket.
I am using emr-7.10.0 and my installed applications include
I have created a .sh
file called boostrap.sh
which is in the same folder as preprocess_audio.py
This what is inside boostrap.sh
pip install librosa s3fs pyarrow
and this is what is in preprocess_audio.py
# preprocess_audio.py
import io, numpy as np, librosa, soundfile as sf, pandas as pd
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import pandas_udf
# ---- feature fn ----
def extract_audio_features_bytes(raw_bytes, n_mfcc=13):
try:
y, sr = sf.read(io.BytesIO(raw_bytes), dtype='float32', always_2d=False)
except Exception:
y, sr = librosa.load(io.BytesIO(raw_bytes), sr=None, mono=True)
if y.ndim > 1:
y = y.mean(axis=1)
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
feats = np.concatenate([mfccs.mean(axis=1), mfccs.std(axis=1)]).astype(float).tolist()
return feats
schema = T.StructType([
T.StructField("path", T.StringType(), False),
T.StructField("features", T.ArrayType(T.DoubleType()), False),
T.StructField("label", T.IntegerType(), False),
])
@pandas_udf(schema)
def featurize_udf(path_s, content_s):
rows = []
for p, b in zip(path_s, content_s):
feats = extract_audio_features_bytes(b)
# label from path
pl = p.lower()
label = 1 if "/deceptive/" in pl else 0
rows.append((p, feats, label))
return pd.DataFrame(rows, columns=["path", "features", "label"])
if __name__ == "__main__":
spark = SparkSession.builder.appName("audio-mfcc-preprocess").getOrCreate()
input_truth = "s3://amazon-sagemaker-590183713013-us-west-2-testaudios/label0/"
input_decep = "s3://amazon-sagemaker-590183713013-us-west-2-testaudios/label1/"
output_root = "s3://amazon-sagemaker-590183713013-us-west-2-outputs/clusteroutput/
df = (spark.read.format("binaryFile")
.option("recursiveFileLookup", "true")
.load([input_truth, input_decep])) # reads all files in both dirs
out = (df.select(featurize_udf(F.col("path"), F.col("content")).alias("o"))
.select("o.*"))
# Parquet (preferred for ML + schema)
(out.coalesce(64) # tune for your cluster
.write.mode("overwrite")
.parquet(output_root + "audio_mfcc_meanstd_parquet/"))
# If you also want a single CSV (small datasets only):
# (out.coalesce(1)
# .write.mode("overwrite")
# .option("header","true")
# .csv(output_root + "audio_mfcc_meanstd_csv/"))
spark.stop()
My step settings include setting Spark Application as my type and choosing cluster mode
Whenever I execute the job, I get the error Exception in thread "main" java.io.FileNotFoundException: No such file or directory: s3://amazon-sagemaker-590183713013-us-west-2-8679f28a966a/dzd_6eb7h2rapff7bk/4i74cf4mlgegsw/shared/scripts/preprocess_audio.py
However, my preprocess_audio.py
is placed correctly in the bucket.
My error logs include are the following
25/09/09 01:24:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/09 01:24:32 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-172-31-34-184.us-west-2.compute.internal/172.31.34.184:8032
25/09/09 01:24:33 INFO Configuration: resource-types.xml not found
25/09/09 01:24:33 INFO ResourceUtils: Unable to find 'resource-types.xml'.
25/09/09 01:24:33 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
25/09/09 01:24:33 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead
25/09/09 01:24:33 INFO Client: Setting up container launch context for our AM
25/09/09 01:24:33 INFO Client: Setting up the launch environment for our AM container
25/09/09 01:24:33 INFO Client: Preparing resources for our AM container
25/09/09 01:24:33 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/09/09 01:24:35 INFO Client: Uploading resource file:/mnt/tmp/spark-62b604da-701a-46bc-bca3-5f2c612d237c/__spark_libs__17547430653657766119.zip -> hdfs://ip-172-31-34-184.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1757380473926_0002/__spark_libs__17547430653657766119.zip
25/09/09 01:24:36 INFO Client: Uploading resource file:/etc/spark/conf.dist/hive-site.xml -> hdfs://ip-172-31-34-184.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1757380473926_0002/hive-site.xml
25/09/09 01:24:36 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-172-31-34-184.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1757380473926_0002/hudi-defaults.conf
25/09/09 01:24:36 INFO EMRFSToS3AConfigMapping: Mapping EMRFS config fs.s3.sts.endpoint to S3A config fs.s3a.assumed.role.sts.endpoint
25/09/09 01:24:37 INFO MetricsConfig: Loaded properties from hadoop-metrics2.properties
25/09/09 01:24:37 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 300 second(s).
25/09/09 01:24:37 INFO MetricsSystemImpl: s3a-file-system metrics system started
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
25/09/09 01:24:37 INFO Client: Uploading resource s3://amazon-sagemaker-590183713013-us-west-2-8679f28a966a/dzd_6eb7h2rapff7bk/4i74cf4mlgegsw/shared/scripts/preprocess_audio.py -> hdfs://ip-172-31-34-184.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1757380473926_0002/preprocess_audio.py
25/09/09 01:24:38 INFO Client: Deleted staging directory hdfs://ip-172-31-34-184.us-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1757380473926_0002
Exception in thread "main" java.io.FileNotFoundException: No such file or directory: s3://amazon-sagemaker-590183713013-us-west-2-8679f28a966a/dzd_6eb7h2rapff7bk/4i74cf4mlgegsw/shared/scripts/preprocess_audio.py
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:4425)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:4279)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$22(S3AFileSystem.java:4256)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:3157)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:3176)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:4254)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:431)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:370)
at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:489)
at org.apache.spark.deploy.yarn.Client.distribute$1(Client.scala:585)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:756)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:1029)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:241)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1370)
at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1818)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1150)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:200)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:223)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:92)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1246)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1255)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
25/09/09 01:24:38 INFO ShutdownHookManager: Shutdown hook called
25/09/09 01:24:38 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-62b604da-701a-46bc-bca3-5f2c612d237c
25/09/09 01:24:38 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-d7aef969-3e47-4d31-96de-1c9753e956b6
25/09/09 01:24:38 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
25/09/09 01:24:38 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
25/09/09 01:24:38 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.
Command exiting with ret '1'
Can anyone please help?
Thanks
This could be because you are missing the s3 GetObject
permission for the EMR cluster. Looks like a similar issue to this previous post: