apache-sparkapache-spark-sqlhadoop-yarngoogle-cloud-dataprocdataproc

Yarn CPU usage and the result of htop on workers are incosistent. I am running a SPARK cluster on Dataproc


I am on Dataproc managed spark cluster

My cluster configuration is as follows:

Necessary imports:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

I start the SparkSession with (Notice the change to the maxPartitionBytes):

spark = SparkSession.builder.\
config("spark.executor.cores","15").\
config("spark.executor.instances","2").\
config("spark.executor.memory","12100m").\
config("spark.dynamicAllocation.enabled", False).\
config("spark.sql.adaptive.enabled", False).\
config("spark.sql.files.maxPartitionBytes","10g").\
getOrCreate()

I have a csv file that takes up ~40GiB on the disk.

I read it in and cache with the following:

df_covid = spark.read.csv("gs://xxxxxxxx.appspot.com/spark_datasets/covid60g.csv",
                          header=True, inferSchema=False)
df_covid.cache()
df_covid.count()
df_covid.rdd.getNumPartitions()
#output: 30

The following is my storage tab post that:

enter image description here

10.3GiB deserialized in memory and 3.9 Serialized on disk

Now, I want to check the CPU usage from my YARN UI and compare it with my htop results on individual workers. The issue is:

  1. Dataproc YARN UI has min_alignment_period of 1 min. The datapoints for each minute are combined into a single point and presented. Hence I ensure to create a relatively heavy sequence of transformations that run for more than a minute per partition. This removes other workloads that might consume time (like loading data from storage to execution memory)

I use the following transformations:

@udf(returnType=StringType())
def f1(x):
    out = ''
    for i in x:
        out += chr(ord(i)+1)
    return out

@udf(returnType=StringType())
def f2(x):
    out = ''
    for i in x:
        out += chr(ord(i)-1)
    return out

df_covid = df_covid.withColumn("_catted", F.concat_ws('',*df_covid.columns))

for i in range(10):
    df_covid = df_covid.withColumn("_catted", f1(F.col("_catted")))
    df_covid = df_covid.withColumn("_catted", f2(F.col("_catted")))
df_covid = df_covid.withColumn("esize1", F.length(F.split("_catted", "e").getItem(1)))
df_covid = df_covid.withColumn("asize1", F.length(F.split("_catted", "a").getItem(1)))
df_covid = df_covid.withColumn("isize1", F.length(F.split("_catted", "i").getItem(1)))
df_covid = df_covid.withColumn("nsize1", F.length(F.split("_catted", "n").getItem(1)))
df_covid = df_covid.filter((df_covid.esize1 > 5) & (df_covid.asize1 > 5) & (df_covid.isize1 > 5) & (df_covid.nsize1 > 5))

Now I call an action to start the computations:

df_covid.count()

I monitor htop on my two worker nodes. After a minute of calling the action both the htops show all the cores being fully utilized and they remain fully utilized for about 3-4 minutes

enter image description here

enter image description here

enter image description here

enter image description here

As you can see from the load average from the images my cores are going full-tilt and the 16 cores are getting utilized completely. You can also check from the uptime on the screenshots that the cores are fully utilized for well over 2 minutes. Actually, they get utilized for about 3+ minutes

My issue is that the CPU utilization from the yarn metrics usage on dataproc monitoring doesn't concur. The following are the CPU utilization charts from the same time:

enter image description here

enter image description here

which shows a maximum CPU usage of ~70%.

What is the reason for the discrepancy between the YARN monitoring and htop. I have seen CPU utilization from yarn going 90%+ for other people. A quick google search would show the same as well. How is that achieved?


Solution

  • Spark fixed costs are a significant proportion of the tiny cluster that I was running my queries on. The CPU usage is 93.5% upon scaling up the cluster size to 12 worker nodes of the same configuration