I am on Dataproc managed spark cluster
My cluster configuration is as follows:
Necessary imports:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
I start the SparkSession with (Notice the change to the maxPartitionBytes):
spark = SparkSession.builder.\
config("spark.executor.cores","15").\
config("spark.executor.instances","2").\
config("spark.executor.memory","12100m").\
config("spark.dynamicAllocation.enabled", False).\
config("spark.sql.adaptive.enabled", False).\
config("spark.sql.files.maxPartitionBytes","10g").\
getOrCreate()
I have a csv file that takes up ~40GiB on the disk.
I read it in and cache with the following:
df_covid = spark.read.csv("gs://xxxxxxxx.appspot.com/spark_datasets/covid60g.csv",
header=True, inferSchema=False)
df_covid.cache()
df_covid.count()
df_covid.rdd.getNumPartitions()
#output: 30
The following is my storage tab post that:
10.3GiB deserialized in memory and 3.9 Serialized on disk
Now, I want to check the CPU usage from my YARN UI and compare it with my htop results on individual workers. The issue is:
I use the following transformations:
@udf(returnType=StringType())
def f1(x):
out = ''
for i in x:
out += chr(ord(i)+1)
return out
@udf(returnType=StringType())
def f2(x):
out = ''
for i in x:
out += chr(ord(i)-1)
return out
df_covid = df_covid.withColumn("_catted", F.concat_ws('',*df_covid.columns))
for i in range(10):
df_covid = df_covid.withColumn("_catted", f1(F.col("_catted")))
df_covid = df_covid.withColumn("_catted", f2(F.col("_catted")))
df_covid = df_covid.withColumn("esize1", F.length(F.split("_catted", "e").getItem(1)))
df_covid = df_covid.withColumn("asize1", F.length(F.split("_catted", "a").getItem(1)))
df_covid = df_covid.withColumn("isize1", F.length(F.split("_catted", "i").getItem(1)))
df_covid = df_covid.withColumn("nsize1", F.length(F.split("_catted", "n").getItem(1)))
df_covid = df_covid.filter((df_covid.esize1 > 5) & (df_covid.asize1 > 5) & (df_covid.isize1 > 5) & (df_covid.nsize1 > 5))
Now I call an action to start the computations:
df_covid.count()
I monitor htop on my two worker nodes. After a minute of calling the action both the htops show all the cores being fully utilized and they remain fully utilized for about 3-4 minutes
As you can see from the load average from the images my cores are going full-tilt and the 16 cores are getting utilized completely. You can also check from the uptime on the screenshots that the cores are fully utilized for well over 2 minutes. Actually, they get utilized for about 3+ minutes
My issue is that the CPU utilization from the yarn metrics usage on dataproc monitoring doesn't concur. The following are the CPU utilization charts from the same time:
which shows a maximum CPU usage of ~70%.
What is the reason for the discrepancy between the YARN monitoring and htop. I have seen CPU utilization from yarn going 90%+ for other people. A quick google search would show the same as well. How is that achieved?
Spark fixed costs are a significant proportion of the tiny cluster that I was running my queries on. The CPU usage is 93.5% upon scaling up the cluster size to 12 worker nodes of the same configuration