apache-sparkamazon-s3ibm-cloudcloud-object-storagestocator

Spark-submit with Stocator failing with Class com.ibm.stocator.fs.ObjectStoreFileSystem not found error


I am trying to run spark-submit wordcount Python on a Kubernetes cluster by pulling a text file stored in COS.

For the config, I followed the Stocator README.md

./bin/spark-submit \
    --master k8s://https://c111.us-south.containers.cloud.ibm.com:32206 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi --packages com.ibm.stocator:stocator:1.1.3 \
    --conf spark.executor.instances=5 --conf spark.hadoop.fs.cos.myobjectstorage.access.key= --conf spark.hadoop.fs.cos.myobjectstorage.secret.key= --conf spark.hadoop.fs.stocator.scheme.list=cos --conf spark.hadoop.fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem --conf spark.hadoop.fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient --conf spark.hadoop.fs.stocator.cos.scheme=cos --conf spark.jars.ivy=/tmp/.ivy\
    --conf spark.kubernetes.container.image=us.icr.io/mods15/spark-py:v1 --conf spark.hadoop.fs.cos.myobjectstorage.endpoint=http://s3.us.cloud-object-storage.appdomain.cloud --conf spark.hadoop.fs.cos.myobjectstorage.v2.signer.type=false --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt

I can see the driver and executors pods spinning up and after a couple of minutes the driver errors-out with the log below.

Driver stacktrace:
21/01/12 11:52:55 INFO DAGScheduler: Job 0 failed: collect at /opt/spark/examples/src/main/python/wordcount.py:40, took 7.839348 s
Traceback (most recent call last):
  File "/opt/spark/examples/src/main/python/wordcount.py", line 40, in <module>
    output = counts.collect()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 889, in collect
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.30.43.123, executor 4): java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.ibm.stocator.fs.ObjectStoreFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2197)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:84)
        at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
        at org.apache.spark.sql.execution.datasources.text.TextFileFormat.$anonfun$readToUnsafeMem$1(TextFileFormat.scala:119)
        at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:147)
        at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
Caused by: java.lang.ClassNotFoundException: Class com.ibm.stocator.fs.ObjectStoreFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
        ... 26 more

Any idea on how I can make this work? I want to pass the text file stored in COS to the wordcount Python example that comes with Spark download (examples folder)

I am using Spark-3.0.1-hadoop2.7 and for the container images, I followed the documentation here


Solution

  • The part that is failing here is

    local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt

    For some reason, the wordcount.py is not able to pick the book.txt file in the COS.

    Moving the cos file call inside the python file as mentioned in the link here solved the issue

    from pyspark import SparkContext
    sc = SparkContext("local", "count app")
    sonnets = sc.textFile("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/files")
    counts = sonnets.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
    counts.saveAsTextFile("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/files/wordcount-result")