pythongoogle-cloud-dataflowapache-beamdata-pipeline

NameError: name 'datetime' is not defined [while running 'ChangeDataType DistrictAllocationAndListStore-ptransform-570']


I wrote code to inject data from CSV file to Google's BigQuery. I used apache beam for the pipeline.

This is the pipeline code:

list_of_data = open_file()
DistrictAllocationAndListStore_data = (p 
                                | 'CreateDictData from DistrictAllocationAndListStore File' >> beam.Create(list_of_data)
                                | 'RenameDictKey DistrictAllocationAndListStore' >> beam.Map(rename_key)
                                | 'ChangeDataType DistrictAllocationAndListStore' >> beam.Map(convert_types_DistrictAllocationAndListStore)
                                | 'Write DistrictAllocationAndListStore' >> WriteToText('output/data-branchessap', '.txt')
                                )


# Write to BQ
DistrictAllocationAndListStore_data | 'Write to BQ DistrictAllocationAndListStore' >> beam.io.WriteToBigQuery(
                table=table_id_tender,
                dataset=dataset_id,
                project=project_id,
                schema=schema_tenders_master,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # batch_size=int(100)
                )

And this is the convert_types_DistrictAllocationAndListStore method:

def convert_types_DistrictAllocationAndListStore(data):
    """Converts string values to their appropriate type."""

    date_format = '%Y-%m-%d'

    data['site_code'] = str(data['site_code']) if 'site_code' in data else None
    data['store_name'] = str(data['store_name']) if 'store_name' in data else None
    data['city'] = str(data['city']) if 'city' in data else None
    data['type'] = str(data['type']) if 'type' in data else None
    data['region_no'] = str(data['region_no']) if 'region_no' in data else None


    if data.get("opening_date") != "":
        date = datetime.datetime.strptime(data.get("opening_date"), date_format)
        data['opening_date'] = str(date.date())
        data['opening_date_year'] = str(date.year)
        data['opening_date_month'] = str(date.month)
        data['opening_date_day'] = str(date.day)
        data['opening_date_dayname'] = str(date.strftime("%A"))
        data['opening_date_weeks'] = str(date.strftime("%W"))
    else:
        data['opening_date'] = None
        data['opening_date_year'] = ""
        data['opening_date_month'] = ""
        data['opening_date_day'] = ""
        data['opening_date_dayname'] = ""
        data['opening_date_weeks'] = ""

    return data

However, when i commented out the Write To BQ code and write to local (using local runner), the code run successfully without error. But when I try to write it to BQ (run with DataFlow runner) it got an error:

    Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 993, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 351, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1290, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1386, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1225, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/media/arroganthooman/DATA/Fikri/UI/Magang/Script/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1570, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "sb-fin-district_allocation_list_store.py", line 104, in convert_types_DistrictAllocationAndListStore
NameError: name 'datetime' is not defined [while running 'ChangeDataType DistrictAllocationAndListStore-ptransform-570']

it seems the datetime are not imported, but i have imported it in the top of my code. Any solution?


Solution

  • You can try to put import datetime within the function.