I am trying to run spark-submit wordcount Python on a Kubernetes cluster by pulling a text file stored in COS.
For the config, I followed the Stocator README.md
./bin/spark-submit \
--master k8s://https://c111.us-south.containers.cloud.ibm.com:32206 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi --packages com.ibm.stocator:stocator:1.1.3 \
--conf spark.executor.instances=5 --conf spark.hadoop.fs.cos.myobjectstorage.access.key= --conf spark.hadoop.fs.cos.myobjectstorage.secret.key= --conf spark.hadoop.fs.stocator.scheme.list=cos --conf spark.hadoop.fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem --conf spark.hadoop.fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient --conf spark.hadoop.fs.stocator.cos.scheme=cos --conf spark.jars.ivy=/tmp/.ivy\
--conf spark.kubernetes.container.image=us.icr.io/mods15/spark-py:v1 --conf spark.hadoop.fs.cos.myobjectstorage.endpoint=http://s3.us.cloud-object-storage.appdomain.cloud --conf spark.hadoop.fs.cos.myobjectstorage.v2.signer.type=false --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt
I can see the driver and executors pods spinning up and after a couple of minutes the driver errors-out with the log below.
Driver stacktrace:
21/01/12 11:52:55 INFO DAGScheduler: Job 0 failed: collect at /opt/spark/examples/src/main/python/wordcount.py:40, took 7.839348 s
Traceback (most recent call last):
File "/opt/spark/examples/src/main/python/wordcount.py", line 40, in <module>
output = counts.collect()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 889, in collect
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.30.43.123, executor 4): java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.ibm.stocator.fs.ObjectStoreFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2197)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:84)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
at org.apache.spark.sql.execution.datasources.text.TextFileFormat.$anonfun$readToUnsafeMem$1(TextFileFormat.scala:119)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:147)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
Caused by: java.lang.ClassNotFoundException: Class com.ibm.stocator.fs.ObjectStoreFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
... 26 more
Any idea on how I can make this work? I want to pass the text file stored in COS to the wordcount Python example that comes with Spark download (examples folder)
I am using Spark-3.0.1-hadoop2.7 and for the container images, I followed the documentation here
The part that is failing here is
local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt
For some reason, the wordcount.py is not able to pick the book.txt
file in the COS.
Moving the cos file call inside the python file as mentioned in the link here solved the issue
from pyspark import SparkContext
sc = SparkContext("local", "count app")
sonnets = sc.textFile("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/files")
counts = sonnets.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
counts.saveAsTextFile("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/files/wordcount-result")