pysparkamazon-emraws-serverlessgraphframesemr-serverless

How to use GraphFrames on EMR serverless


Summary of steps executed:

  1. Uploaded the python script to S3.
  2. Created a virtualenv that installs graphframes and uploaded it to S3.
  3. Added a VPC to my EMR application.
  4. Added graphframes package to spark conf.

The error message was:

22/09/11 18:44:49 INFO Utils: /home/hadoop/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar has been previously copied to /tmp/spark-d0c75876-b210-4ce2-b2f8-cd65e59d00db/userFiles-1aef56b8-1c39-4158-b9a2-ee810a844314/org.slf4j_slf4j-api-1.7.16.jar
22/09/11 18:44:49 INFO Executor: Fetching file:/tmp/spark-ae6aefff-23d3-447e-a743-5ed41621fad0/pyspark_ge.tar.gz#environment with timestamp 1662921887645
22/09/11 18:44:49 INFO Utils: Copying /tmp/spark-ae6aefff-23d3-447e-a743-5ed41621fad0/pyspark_ge.tar.gz to /tmp/spark-7f9cf09a-dc6e-41e0-b187-61f1b5a80670/pyspark_ge.tar.gz
22/09/11 18:44:49 INFO Executor: Unpacking an archive file:/tmp/spark-ae6aefff-23d3-447e-a743-5ed41621fad0/pyspark_ge.tar.gz#environment from /tmp/spark-7f9cf09a-dc6e-41e0-b187-61f1b5a80670/pyspark_ge.tar.gz to /tmp/spark-d0c75876-b210-4ce2-b2f8-cd65e59d00db/userFiles-1aef56b8-1c39-4158-b9a2-ee810a844314/environment
22/09/11 18:44:49 INFO Executor: Fetching spark://[2600:1f18:61b4:c700:3016:ab0c:287a:ccaf]:34507/jars/org.slf4j_slf4j-api-1.7.16.jar with timestamp 1662921887645
22/09/11 18:44:50 ERROR Utils: Aborting task
java.io.IOException: Failed to connect to /2600:1f18:61b4:c700:3016:ab0c:287a:ccaf:34507
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)

Could anybody point me to a solution/documentation that helps with this problem?

ANNEX
I also tried to add just graphframes package but got a "numpy not found error".

More detailed steps

  1. python script:
import sys
from operator import add
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from graphframes import *

print('job started')

conf = (SparkConf()
    .setMaster("local")
    .setAppName("GraphDataFrame")
    .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

if __name__ == "__main__":
    # Create a Vertex DataFrame with unique ID column "id"

    v = sqlContext.createDataFrame([
        ("a", "Alice", 34),
        ("b", "Bob", 36),
        ("c", "Charlie", 30),
        ], ["id", "name", "age"])

    # Create an Edge DataFrame with "src" and "dst" columns
    e = sqlContext.createDataFrame([
        ("a", "b", "friend"),
        ("b", "c", "follow"),
        ("c", "b", "follow"),
        ], ["src", "dst", "relationship"])

    # Create a GraphFrame
    from graphframes import *
    g = GraphFrame(v, e)

    # Query: Get in-degree of each vertex.
    print(g.inDegrees.show())

    # Query: Count the number of "follow" connections in the graph.
    g.edges.filter("relationship = 'follow'").count()

    # Run PageRank algorithm, and show results.
    results = g.pageRank(resetProbability=0.01, maxIter=20)
    print(results.vertices.select("id", "pagerank").show())

    sc.stop()
  1. Virtualenv Dockerfile compiled and copied to S3.
FROM --platform=linux/amd64 amazonlinux:2 AS base
RUM yum install -y python
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install venv-pack==0.2.0
RUN python3 -m pip install graphframes
...
  1. Just added a default generated VPC with NAT.
  2. Conf graphframes added:
--conf spark.jars.packages=graphframes:graphframes:0.8.2-spark3.2-s_2.12
  1. Conf for virtualenv
--conf spark.archives=s3://bucket_name/venv.tar.gz#environment  
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python  
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python  
--conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python  

Solution

  • Found the solution: We can't use a context like the one I used:

    conf = (SparkConf()
        .setMaster("local")
        .setAppName("GraphDataFrame")
        .set("spark.executor.memory", "1g"))
    sc = SparkContext(conf = conf)
    sqlContext = SQLContext(sc)
    

    Use this instead:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("HelloGraphFrame").getOrCreate()