pythonpysparkgraphframes

Cannot set checkpoint dir when running Connected Component example


This is the Connected Components example by graphframe:

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

In the document, they said:

NOTE: With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm requires setting a Spark checkpoint directory. Users can revert to the old algorithm using connectedComponents.setAlgorithm("graphx").

So this is my full code connected.py with setCheckpointDir:

import pyspark

sc = pyspark.SparkContext().getOrCreate()

sc.addPyFile("/home/username/.ivy2/jars/graphframes_graphframes-0.8.1-spark3.0-s_2.12.jar")

from graphframes.examples import Graphs

sc.setCheckpointDir("graphframes_cps")

g = Graphs(sqlContext).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

And run with this command:

spark-submit connected.py --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12

Then it returns this error:

Traceback (most recent call last):
  File "/home/username//test/spark/connected.py", line 11, in <module>
    sc.setCheckpointDir("graphframes_cps")
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 975, in setCheckpointDir
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o19.setCheckpointDir.

How can I fix this?


Solution

  • When running the example of Connected Components by graphframes:

    from graphframes.examples import Graphs
    g = Graphs(sqlContext).friends()  # Get example graph
    
    result = g.connectedComponents()
    result.select("id", "component").orderBy("component").show()
    

    I will get this error:

    java.io.IOException: Checkpoint directory is not set. Please set it first using sc.setCheckpointDir().
    

    That means I have not set the checkpointDir yet. Then add that line:

    sc.setCheckpointDir(dirName="/home/username/graphframes_cps")
    
    result = g.connectedComponents()
    result.select("id", "component").orderBy("component").show()
    

    The error I got is:

    Traceback (most recent call last):
      File "/home/username//test/spark/connected.py", line 11, in <module>
        sc.setCheckpointDir("graphframes_cps")
      File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 975, in setCheckpointDir
      File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
      File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o19.setCheckpointDir.
    
    Py4JJavaError: An error occurred while calling o176.setCheckpointDir.
    : java.net.ConnectException: Call From huycomputer/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    

    There are others error lines below that I didn't notice but that is the root problem. I didn't start the HDFS so pyspark cannot connect to localhost:9000 which is the HDFS service port.

    So after I run start-dfs.sh, it works as expected. But I still don't know how to use the local folder. This is localhost:9870 and in the "/home/username/graphframes_cps" path after I run the example a few times.

    enter image description here

    So this is my full code, I use Jupyter Notebook so it already start a SparkContext, I just need to use sc variable to run setCheckpointDir():

    from graphframes.examples import Graphs
    g = Graphs(sqlContext).friends()  # Get example graph
    sc.setCheckpointDir(dirName="/home/dhuy237/graphframes_cps")
    
    result = g.connectedComponents()
    result.select("id", "component").orderBy("component").show()
    

    Output:

    +---+------------+
    | id|   component|
    +---+------------+
    |  b|412316860416|
    |  c|412316860416|
    |  e|412316860416|
    |  f|412316860416|
    |  d|412316860416|
    |  a|412316860416|
    +---+------------+