I am trying to create a google data flow template for reading JSON file and load it to google data store. Below is my code.
I can able to load the data successfully however i would like to pass Data store key/KIND as input parameter from my template and create entities using the same. Can someone help me how to pass it code?
Below is the code snippet getting input from at run time. I have --datastore_key as one of it.
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--json_input',
dest='json_input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
parser.add_value_provider_argument(
'--project_id',
dest='project_id',
type=str,
required=False,
help='Input Project ID.')
parser.add_value_provider_argument(
'--datastore_key',
dest='datastore_key',
type=str,
required=False,
help='The Key name')
Below is the snippet where i am assigning datastore_key to a the entity creation as per the instruction here.
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id):
self.project_id = project_id
def start_bundle(self):
self.client = datastore.Client()
def start_datastore(self, datastore_key):
self.datastore_key = datastore_key
def process(self, an_int):
yield self.datastore_key.get() + an_int
def process(self, element):
try:
key = self.client.key(datastore_key ,element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
I am creating pipe line as below,
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))
I am not getting the data store key created if i am passing it as run time parameter. If i hard code like this its working
key = self.client.key('customer' ,element['customerNumber'])
I want something like this
key = self.client.key(runtime_datastore_key ,runtime_datastore_id)
Can someone please help me in how to pass data store Key/Kind as run time parameter ?
Thanks, GS
It looks like you are not passing in the datastore_key
value provider to CreateHbaseRow
.
Try using:
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id, datastore_key):
self.project_id = project_id
self.datastore_key = datastore_key
def start_bundle(self):
self.client = datastore.Client()
def process(self, element):
try:
key = self.client.key(datastore_key.get(), element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
Note that I left project_id since it seemed like you wanted it but my code below doesn't use it.
You also want to ensure that you pass the relevant value providers from the options
instance to your DoFn
. Thus your pipeline creation code becomes:
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))