python-3.xpysparkanacondabigdata

Py4JJavaError: An error occurred while calling o37.showString. Spark & anaconda3


I am a student I am really stuck with this problem of Py4JJavaError for two weeks, on the internet there is not much; I really need help:

I follow this tutorial :https://learn.microsoft.com/fr-fr/azure/hdinsight/spark/apache-spark-machine-learning-mllib-ipython

when I retrieve a line from the RDD in order to be able to observe the data schema like inspections.take(1) or df.show(5) I come across this error

> Py4JJavaError                             Traceback (most recent call
> last) <ipython-input-13-eb589bae8d4b> in <module>()
> ----> 1 df.show(5)
> 
> ~/anaconda3/lib/python3.6/site-packages/pyspark/sql/dataframe.py in
> show(self, n, truncate, vertical)
>     376         """
>     377         if isinstance(truncate, bool) and truncate:
> --> 378             print(self._jdf.showString(n, 20, vertical))
>     379         else:
>     380             print(self._jdf.showString(n, int(truncate), vertical))
> 
> ~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in
> __call__(self, *args)    1255         answer = self.gateway_client.send_command(command)    1256         return_value
> = get_return_value(
> -> 1257             answer, self.gateway_client, self.target_id, self.name)    1258     1259         for temp_arg in temp_args:
> 
> ~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in
> deco(*a, **kw)
>      61     def deco(*a, **kw):
>      62         try:
> ---> 63             return f(*a, **kw)
>      64         except py4j.protocol.Py4JJavaError as e:
>      65             s = e.java_exception.toString()
> 
> ~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in
> get_return_value(answer, gateway_client, target_id, name)
>     326                 raise Py4JJavaError(
>     327                     "An error occurred while calling {0}{1}{2}.\n".
> --> 328                     format(target_id, ".", name), value)
>     329             else:
>     330                 raise Py4JError(
> 
> Py4JJavaError: An error occurred while calling o37.showString. :
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0
> in stage 0.0 (TID 0, localhost, executor driver):
> org.apache.spark.api.python.PythonException: Traceback (most recent
> call last):   File
> "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
> line 372, in main
>     process()   File "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
> line 367, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)   File
> "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py",
> line 390, in dump_stream
>     vs = list(itertools.islice(iterator, batch))   File "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py",
> line 100, in wrapper
>     return f(*args, **kwargs)   File "<ipython-input-10-9aa45565a8c1>", line 3, in csvParse
> ModuleNotFoundError: No module named 'StringIO'
> 
>   at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
>   at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
>   at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
>   at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>   at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)   at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)    at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)    at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)    at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)   at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)    at
> org.apache.spark.scheduler.Task.run(Task.scala:121)   at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:    at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>   at scala.Option.foreach(Option.scala:257)   at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>   at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)    at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)     at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)     at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
>   at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
>   at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
>   at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
>   at
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
>   at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
>   at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>   at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>   at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)  at
> org.apache.spark.sql.Dataset.head(Dataset.scala:2545)     at
> org.apache.spark.sql.Dataset.take(Dataset.scala:2759)     at
> org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)   at
> org.apache.spark.sql.Dataset.showString(Dataset.scala:292)    at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:483)     at
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)    at
> py4j.Gateway.invoke(Gateway.java:282)     at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)   at
> py4j.GatewayConnection.run(GatewayConnection.java:238)    at
> java.lang.Thread.run(Thread.java:745) Caused by:
> org.apache.spark.api.python.PythonException: Traceback (most recent
> call last):   File
> "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
> line 372, in main
>     process()   File "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
> line 367, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)   File
> "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py",
> line 390, in dump_stream
>     vs = list(itertools.islice(iterator, batch))   File "/Users/sabbar/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py",
> line 100, in wrapper
>     return f(*args, **kwargs)   File "<ipython-input-10-9aa45565a8c1>", line 3, in csvParse
> ModuleNotFoundError: No module named 'StringIO'
> 
>   at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
>   at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
>   at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
>   at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>   at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)   at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)    at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)    at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)    at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)   at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)    at
> org.apache.spark.scheduler.Task.run(Task.scala:121)   at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more

here is the code :

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import pyspark 
#from pyspark import SparkContext
#sc = SparkContext("local", "Simple App")
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from py4j.protocol import Py4JJavaError

def csvParse(s):
    import csv
    from StringIO import StringIO
    sio = StringIO(s)
    value = csv.reader(sio).next()
    sio.close()
    return value

inspections = sc.textFile('Chicago_Street_Names.csv').map(csvParse)

inspections.take(1)

Please help me this is project to make next week


Solution

  • As @pault suggested in comments, you don't need to write your own function to parse simple csv files. You can use sc.read.csv(FILEPATH).

    If you want to proceed with your function as is, then you can replace from StringIO import StringIO with from io import StringIO. StringIO package has been replaced with io package in newer Python 3 version.