I want to run a pagerank on relativly large graph 3.5 billion nodes 90 billion edges. And I have been experimenting with different cluster sizes to get it to run. But first the code:
from pyspark.sql import SparkSession
import graphframes
spark = SparkSession.builder.getOrCreate()
edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF = spark.read.parquet('s3://path/to/verts') # 25GB total size
graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()
result_df = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')
I experienced high garbage collection problems times right from the start. So I experimented with different settings and sizes for the cluster. I mainly followed two articles:
https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
I run the cluster on amazon EMR. These are the relevant setting I currently use:
"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
"maximizeResourceAllocation": "true"
"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"
I experimented with cluster sizes my first experiment that seemed to work was
a cluster with the following parameters: --deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5
With this configuration GC
time was way down everything was working but since it was a test the cluster it had very "little" memory with 2.7 TB
in total, also after a while I got ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137.
Which I thought happened because I left the node
to little RAM. So I rerun the whole thing but this time with --executor-cores 5 --executor-memory 35g
and right away my GC
problems where back and my cluster acted really weird. So I thought I understood the problem that the reason for the high GC
times was not insufficient memory per executor.
Next cluster I spun up was with the following parameters: --deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5
So a larger cluster and even more memory per executor as before. everything was running smoothly and I noticed via ganglia
that the first step took about 5.5 TB
of ram.
I though I understood the issues that using less cores available to my cluster and enlarging the memory of each executor makes the program faster I guessed that it hast to do with the verts_DF
being about 25gb in size and this way it would fit into the memory of each executor and leave room the calculations (25GB * 179 nearly is 5.5TB).
So the next cluster I spun up had the same number of nodes but I resized the exectuors to: --num-executors 119 --executor-cores 5 --executor-memory 75g
Instantly all the problems where back! High GC
times the cluster was hanging via ganglia
I could see the RAM filling up to 8 of 9 available TB. I was baffled.
I went back and spun up the --num-executors 179 --executor-cores 5 --executor-memory 45g
cluster again, which luckily is easy to do with EMR
because I could just clone it. But now also this configuration did not work. High GC
times Cluster hitting 8TB
of used memory right away.
What is going on here? It feels like I play roulette sometimes the same config works and other times it does not?
If someone still stumbles upon this after some time passed it realized that the problem lies with how graphx
or graphframes
load the graph. Both try to generate all triplets of the graph they are loading, which with very large graphs resoluts in OOM
errors, because a graph with 3.5 billion nodes and 70 billion edges has damn many of them.
I wrote a solution by implementing pagerank in pyspark
. It is for sure not as fast as a scala
implementation but it scales and does not run into the described triplet problem.
I published it on github
https://github.com/thagorx/spark_pagerank