google-cloud-platformpysparkgoogle-cloud-dataprocdataprocpython-pex

Packaging PySpark with PEX environment on dataproc


I'm trying to package a pyspark job with PEX to be run on google cloud dataproc, but I'm getting a Permission Denied error.

I've packaged my third party and local dependencies into env.pex and an entrypoint that uses those dependencies into main.py. I then gsutil cp those two files up to gs://<PATH> and run the script below.

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def submit_job(project_id: str, region: str, cluster_name: str):
    job_client = dataproc.JobControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )
    operation = job_client.submit_job_as_operation(
        request={
            "project_id": project_id,
            "region": region,
            "job": {
                "placement": {"cluster_name": cluster_name},
                "pyspark_job": {
                    "main_python_file_uri": "gs://<PATH>/main.py",
                    "file_uris": ["gs://<PATH>/env.pex"],
                    "properties": {
                        "spark.pyspark.python": "./env.pex",
                        "spark.executorEnv.PEX_ROOT": "./.pex",
                    },
                },
            },
        }
    )

The error I get is

Exception in thread "main" java.io.IOException: Cannot run program "./env.pex": error=13, Permission denied
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:97)
    at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=13, Permission denied
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 14 more

Should I expect packaging my environment like this to work? I don't see a way to change the permission of files included as file_uris in the pyspark job config, and I don't see any documentation on google cloud about packaging with PEX, but PySpark official docs include this guide.

Any help is appreciated - thanks!


Solution

  • I wasn't able to run the pex directly in the end, but did get a workaround working for now, which was suggested by a user in the pants slack community (thanks!)...

    The workaround is to unpack the pex as a venv in a cluster initialization script.

    The initialization script gsutil copied to gs://<PATH TO INIT SCRIPT>:

    #!/bin/bash
    
    set -exo pipefail
    
    readonly PEX_ENV_FILE_URI=$(/usr/share/google/get_metadata_value attributes/PEX_ENV_FILE_URI || true)
    readonly PEX_FILES_DIR="/pexfiles"
    readonly PEX_ENV_DIR="/pexenvs"
    
    function err() {
        echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $*" >&2
        exit 1
    }
    
    function install_pex_into_venv() {
        local -r pex_name=${PEX_ENV_FILE_URI##*/}
        local -r pex_file="${PEX_FILES_DIR}/${pex_name}"
        local -r pex_venv="${PEX_ENV_DIR}/${pex_name}"
    
        echo "Installing pex from ${pex_file} into venv ${pex_venv}..."
        gsutil cp "${PEX_ENV_FILE_URI}" "${pex_file}"
        PEX_TOOLS=1 python "${pex_file}" venv --compile "${pex_venv}"
    }
    
    function main() {
        if [[ -z "${PEX_ENV_FILE_URI}" ]]; then
            err "ERROR: Must specify PEX_ENV_FILE_URI metadata key"
        fi
    
        install_pex_into_venv
    }
    
    main
    

    To start the cluster and run the initialization script to unpack the pex into a venv on the cluster:

    from google.cloud import dataproc_v1 as dataproc
    
    def start_cluster(project_id: str, region: str, cluster_name: str):
        cluster_client = dataproc.ClusterControllerClient(...)
        operation = cluster_client.create_cluster(
            request={
                "project_id": project_id,
                "region": region,
                "cluster": {
                    "project_id": project_id,
                    "cluster_name": cluster_name,
                    "config": {
                        "master_config": <CONFIG>,
                        "worker_config": <CONFIG>,
                        "initialization_actions": [
                            {
                                "executable_file": "gs://<PATH TO INIT SCRIPT>",
                            },
                        ],
                        "gce_cluster_config": {
                            "metadata": {"PEX_ENV_FILE_URI": "gs://<PATH>/env.pex"},
                        },
                    },
                },
            }
        )
    
    

    To start the job and use the unpacked pex venv to run the pyspark job:

    def submit_job(project_id: str, region: str, cluster_name: str):
        job_client = dataproc.ClusterControllerClient(...)
        operation = job_client.submit_job_as_operation(
            request={
                "project_id": project_id,
                "region": region,
                "job": {
                    "placement": {"cluster_name": cluster_name},
                    "pyspark_job": {
                        "main_python_file_uri": "gs://<PATH>/main.py",
                        "properties": {
                            "spark.pyspark.python": "/pexenvs/env.pex/bin/python",
                        },
                    },
                },
            }
        )