I am trying to implement this(where I don't have variables in the conf file but passed it as named arguments)
mentioned here.
When running in local mode and a python debugger, I can easily pass this as:
Fundingsobj = SomeClass(init_conf={"localmode": "true", "fundingsdatapath": "tmp/fundings"})
Fundingsobj.launch()
where SomeClass
inherits Task
However, I can't seem to pass this through the deployment.yaml. I have tried many versions
This is how I tried to read
class SomeClass(Task):
"""Class containing methods for generating test data."""
def initialize(self):
"""Initialize method."""
self.localmode = self.conf["localmode"]
This works fine, if I use the normal --conf-file option in the deployment.yaml
and then provide the values there, or use the init_conf when I use the local debugger.
How do I pass variables to the job without relying on a conf file?
Idea is, after the job is deployed in Databricks, I would like to schedule it from Airflow by passing variables everyday.
Error while launching the job -
EDIT 1: I have tried to use kwargs, but even that gives me the same error:
named_parameters: {"localmode": "true","fundingsdatapath": "tmp/fundings"}
and then try to consume, using
def initialize(self, **kwargs):
"""Initialize method."""
self.localmode = kwargs["localmode"]
I found the answer. Basically, one has to use argparse
for this.
So, after I defined my yaml to be (in deployment.yaml)
- name: "clientscoretestdatageneratorusingparams"
tasks:
- task_key: "loadtestdataparams"
<<:
- *basic-static-cluster
libraries:
- pypi:
package: someadditionalpkg
repo: http://internalartifactoryurl
python_wheel_task:
package_name: "workflows"
entry_point: clientscoretestdatagenerator
named_parameters: {"localmode": "true","fundingsdatapath": "tmp/fundings"}
then in the entrypoint method
from argparse import ArgumentParser
....class and its other methods.....
....................................
def entrypoint(): # pragma: no cover
"""Entrypoint for spark wheel jobs."""
parser = ArgumentParser()
parser.add_argument("--localmode", dest="localmode", default=False)
parser.add_argument("--fundingsdatapath", dest="fundingsdatapath", default="tmp/fundings")
parser.add_argument("--datalakename", dest="datalakename", default="datalakename")
args = parser.parse_args()
fundingsobj = GenerateClientScoreData()
fundingsobj.launch(args)
and then consume using
def initialize(self, args):
"""Initialize method."""
self.localmode = args.localmode