pysparkdata-science-experienceibm-cloud-storagestocator

java.lang.AbstractMethodError: com/ibm/stocator/fs/common/IStoreClient.setStocatorPath(Lcom/ibm/stocator/fs/common/StocatorPath;)V


I'm trying to access data on IBM COS from Data Science Experience based on this blog post.

First, I select 1.0.8 version of stocator ...

!pip install --user --upgrade pixiedust
import pixiedust
pixiedust.installPackage("com.ibm.stocator:stocator:1.0.8")

Restarted kernel, then ...

access_key = 'xxxx'
secret_key = 'xxxx'
bucket = 'xxxx'
host = 'lon.ibmselect.objstor.com'

hconf = sc._jsc.hadoopConfiguration()
hconf.set("fs.s3d.service.endpoint", "http://" + host)
hconf.set("fs.s3d.service.access.key", access_key)
hconf.set("fs.s3d.service.secret.key", secret_key)

file = 'mydata_file.tsv.gz'

inputDataset = "s3d://{}.service/{}".format(bucket, file)

lines = sc.textFile(inputDataset, 1)
lines.count()

However, that results in the following error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.AbstractMethodError: com/ibm/stocator/fs/common/IStoreClient.setStocatorPath(Lcom/ibm/stocator/fs/common/StocatorPath;)V
    at com.ibm.stocator.fs.ObjectStoreFileSystem.initialize(ObjectStoreFileSystem.java:104)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:249)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:249)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:249)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:249)
    at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:249)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:249)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:932)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:378)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:931)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
    at java.lang.reflect.Method.invoke(Method.java:507)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:785)

ā€‹Note: My first attempt at connecting to IBM COS resulted in a different error. That attempt is captured here: No FileSystem for scheme: cos


Solution

  • Don't force-install a new Stocator unless you have a really good reason.

    I highly recommend the Spark aaS documentation at:

    https://console.bluemix.net/docs/services/AnalyticsforApacheSpark/index-gentopic1.html#genTopProcId2

    Please choose the correct COS endpoints from:

    https://ibm-public-cos.github.io/crs-docs/endpoints

    and PLEASE use the private endpoints if you're working from within the IBM Cloud. It will be much faster and cheaper.

    It's got examples of how to access COS data using all the nice helpers. It'll boil down to

    import ibmos2spark
    
    credentials = {
      'endpoint': 's3-api.us-geo.objectstorage.service.networklayer.com',  #just an example. Your url might be different
      'access_key': 'my access key',
      'secret_key': 'my secret key'
    }
    bucket_name = 'my bucket name'
    object_name = 'mydata_file.tsv.gz'
    
    cos = ibmos2spark.CloudObjectStorage(sc, credentials)
    lines = sc.textFile(cos.url(object_name, bucket_name),1)
    lines.count()