pythonapache-sparkpysparkmlopsnuclio

When would I use Spark Operator vs Spark Standalone in Iguazio?


I see in the services UI that I can create a Spark cluster. I also see that I can use the Spark operator runtime when executing a job. What is the use case for each and why would I choose one vs the other?


Solution

  • There are two ways of using Spark in Iguazio:

    1. Create a standalone Spark cluster via the Iguazio UI (like you found on the services page). This is a persistent cluster that you can associate with multiple jobs, Jupyter notebooks, etc. This is a good choice for long running computations with a static pool of resources. An overview of the Spark service in Iguazio can be found here along with some ingestion examples.
      • When creating a JupyterLab instance in the UI, there is an option to associate it with an existing Spark cluster. This lets you use PySpark out of the box
    2. Create an ephemeral Spark cluster via the Spark Operator. This is a temporary cluster that only exists for the duration of the job. This is a good choice for shorter one-off jobs with a static or variable pool of resources. The Spark Operator runtime is usually the better option if you don't need a persistent Spark cluster. Some examples of using the Spark operator on Iguazio can be found here as well as below.
    import mlrun
    import os
    
    # set up new spark function with spark operator
    # command will use our spark code which needs to be located on our file system
    # the name param can have only non capital letters (k8s convention)
    sj = mlrun.new_function(kind='spark', command='spark_read_csv.py', name='sparkreadcsv') 
    
    # set spark driver config (gpu_type & gpus=<number_of_gpus>  supported too)
    sj.with_driver_limits(cpu="1300m")
    sj.with_driver_requests(cpu=1, mem="512m") 
    
    # set spark executor config (gpu_type & gpus=<number_of_gpus> are supported too)
    sj.with_executor_limits(cpu="1400m")
    sj.with_executor_requests(cpu=1, mem="512m")
    
    # adds fuse, daemon & iguazio's jars support
    sj.with_igz_spark() 
    
    # set spark driver volume mount
    # sj.function.with_driver_host_path_volume("/host/path", "/mount/path")
    
    # set spark executor volume mount
    # sj.function.with_executor_host_path_volume("/host/path", "/mount/path")
    
    # args are also supported
    sj.spec.args = ['-spark.eventLog.enabled','true']
    
    # add python module
    sj.spec.build.commands = ['pip install matplotlib']
    
    # Number of executors
    sj.spec.replicas = 2 
    
    # Rebuilds the image with MLRun - needed in order to support artifactlogging etc
    sj.deploy()
    
    # Run task while setting the artifact path on which our run artifact (in any) will be saved
    sj.run(artifact_path='/User')
    

    Where the spark_read_csv.py file looks like:

    from pyspark.sql import SparkSession
    from mlrun import get_or_create_ctx
    
    context = get_or_create_ctx("spark-function")
    
    # build spark session
    spark = SparkSession.builder.appName("Spark job").getOrCreate()
    
    # read csv
    df = spark.read.load('iris.csv', format="csv",
                         sep=",", header="true")
    
    # sample for logging
    df_to_log = df.describe().toPandas()
    
    # log final report
    context.log_dataset("df_sample",
                         df=df_to_log,
                         format="csv")
    spark.stop()