databricksdatabricks-clidatabricks-dbx

Databricks DBX named parameters through job


I am trying to implement this(where I don't have variables in the conf file but passed it as named arguments)

parameterpassing

mentioned here.

When running in local mode and a python debugger, I can easily pass this as:

Fundingsobj = SomeClass(init_conf={"localmode": "true", "fundingsdatapath": "tmp/fundings"})
Fundingsobj.launch()

where SomeClass inherits Task

However, I can't seem to pass this through the deployment.yaml. I have tried many versions

attemmpt 1

attempt 2

Attempt 3

Attempt 4

This is how I tried to read

class SomeClass(Task):
    """Class containing methods for generating test data."""

    def initialize(self):
        """Initialize method."""
        self.localmode = self.conf["localmode"]

This works fine, if I use the normal --conf-file option in the deployment.yaml and then provide the values there, or use the init_conf when I use the local debugger.

How do I pass variables to the job without relying on a conf file?

Idea is, after the job is deployed in Databricks, I would like to schedule it from Airflow by passing variables everyday.

Error while launching the job -

enter image description here

EDIT 1: I have tried to use kwargs, but even that gives me the same error:

named_parameters:  {"localmode": "true","fundingsdatapath": "tmp/fundings"}

and then try to consume, using

def initialize(self, **kwargs):
        """Initialize method."""
        self.localmode = kwargs["localmode"] 

Solution

  • I found the answer. Basically, one has to use argparse for this.

    So, after I defined my yaml to be (in deployment.yaml)

          - name: "clientscoretestdatageneratorusingparams"
            tasks:
              - task_key: "loadtestdataparams"
                <<:
                  - *basic-static-cluster
                libraries:
                  - pypi:
                      package: someadditionalpkg
                      repo: http://internalartifactoryurl
                python_wheel_task:
                  package_name: "workflows"
                  entry_point: clientscoretestdatagenerator
                  named_parameters: {"localmode": "true","fundingsdatapath": "tmp/fundings"}
    

    then in the entrypoint method

    from argparse import ArgumentParser
    ....class and its other methods.....
    ....................................
    def entrypoint():  # pragma: no cover
        """Entrypoint for spark wheel jobs."""
        parser = ArgumentParser()
        parser.add_argument("--localmode", dest="localmode", default=False)
        parser.add_argument("--fundingsdatapath", dest="fundingsdatapath", default="tmp/fundings")
        parser.add_argument("--datalakename", dest="datalakename", default="datalakename")
    
        args = parser.parse_args()
        fundingsobj = GenerateClientScoreData()
        fundingsobj.launch(args)
    

    and then consume using

       def initialize(self, args):
           """Initialize method."""
           self.localmode = args.localmode