python-3.xgoogle-cloud-datastoregoogle-cloud-dataflowvalue-provider

Google Data Flow Passing data store Key as input parameter


I am trying to create a google data flow template for reading JSON file and load it to google data store. Below is my code.

I can able to load the data successfully however i would like to pass Data store key/KIND as input parameter from my template and create entities using the same. Can someone help me how to pass it code?

Below is the code snippet getting input from at run time. I have --datastore_key as one of it.

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--json_input',
                dest='json_input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

        parser.add_value_provider_argument(
                '--project_id',
                dest='project_id',
                type=str,
                required=False,
                help='Input Project ID.')

        parser.add_value_provider_argument(
                '--datastore_key',
                dest='datastore_key',
                type=str,
                required=False,
                help='The Key name')

Below is the snippet where i am assigning datastore_key to a the entity creation as per the instruction here.

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id):
       self.project_id = project_id

    def start_bundle(self):
        self.client = datastore.Client()

    def start_datastore(self, datastore_key):
        self.datastore_key = datastore_key

    def process(self, an_int):
        yield self.datastore_key.get() + an_int

    def process(self, element):
        try:
            key = self.client.key(datastore_key ,element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

I am creating pipe line as below,

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))

I am not getting the data store key created if i am passing it as run time parameter. If i hard code like this its working

key = self.client.key('customer' ,element['customerNumber'])

I want something like this

key = self.client.key(runtime_datastore_key ,runtime_datastore_id)

Can someone please help me in how to pass data store Key/Kind as run time parameter ?

Thanks, GS


Solution

  • It looks like you are not passing in the datastore_key value provider to CreateHbaseRow.


    Try using:

    class CreateHbaseRow(beam.DoFn): 
        def __init__(self, project_id, datastore_key):
           self.project_id = project_id
           self.datastore_key = datastore_key
    
        def start_bundle(self):
            self.client = datastore.Client()
    
        def process(self, element):
            try:
                key = self.client.key(datastore_key.get(), element['customerNumber'])
                entity = datastore.Entity(key=key)
                entity.update(element)  
                self.client.put(entity) 
            except:   
                logging.error("Failed with input: ", str(element))
    

    Note that I left project_id since it seemed like you wanted it but my code below doesn't use it.


    You also want to ensure that you pass the relevant value providers from the options instance to your DoFn. Thus your pipeline creation code becomes:

    p = beam.Pipeline(options=options)
    
    lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
    lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
    lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))