pythongoogle-bigquerygoogle-cloud-dataflowapache-beamapache-beam-io

Apache Beam Python script fails to write to Google Big Query


I cannot write my data to Google Big Query. I keep getting errors such as

AttributeError: Error trying to access nonexistent attribute `0` in write result. Please see __documentation__ for available attributes. [while running '[8]: Write to BigQuery']

I currently call and API. The API either gives me data as below or returns a None. In the former, I want to write to an existing Google Big Query table.

[{'datetime': '2023-06-13T14:09:57',
  'user_location': 'Hungry Lion',
  'store_location': 'ZEVENWACHT MALL',
  'device_name': 'UHC',
  'battery_level': 'medium',
  'probe_name': 'UPRIGHT 1',
  'probe_type': 'K Type',
  'probe_number': 1,
  'channel_name': 'Temperature',
  'channel_type': 'Temperature',
  'channel_number': 1,
  'record_type': 'reading',
  'value': 72.2,
  'channel_unit': '°C'},
 {'datetime': '2023-06-13T14:09:57',
  'user_location': 'Hungry Lion',
  'store_location': 'ZEVENWACHT MALL',
  'device_name': 'UHC',
  'battery_level': 'medium',
  'probe_name': 'UPRIGHT 2',
  'probe_type': 'K Type',
  'probe_number': 2,
  'channel_name': 'Temperature',
  'channel_type': 'Temperature',
  'channel_number': 2,
  'record_type': 'reading',
  'value': 66.6,
  'channel_unit': '°C'}]

To me this seems like valid data. I then run the following pipeline which handles the case when a None is returned by the API. In the below pipeline, I first read some data from Cloud Storage as to make the API call. The API is then called which returns data that looks like the above list of dictionaries.

SCHEMA = 'datetime:DATETIME,user_location:STRING,store_location:STRING,'+\
    'device_name:STRING,battery_level:STRING,probe_name:STRING,'+\
    'probe_type:STRING,probe_number:INTEGER,channel_name:STRING,'+\
    'channel_type:STRING,channel_number:INTEGER,record_type:STRING,'+\
    'value:FLOAT,channel_unit:STRING'


pipeline = beam.Pipeline()

def api_call(start_datetime):
    API = EasyLogCloudAPI(userGUID,APIToken)
    start_datetime = datetime.datetime.strptime(start_datetime,
                                                DATETIME_FORMAT)
    end_datetime = datetime.datetime.today()
    history = API.get_history(start_datetime=start_datetime,
                              end_datetime=end_datetime,
                              df=False)
    if history is None:
        return None
    return history  # a list of dictionaries

class WriteToBigQuery(beam.DoFn):
    
    def process(self,element):
        if element[0] is None:
            return None
        return element | 'Write to BigQuery' >>  beam.io.WriteToBigQuery(TABLE,schema=SCHEMA,
                                                                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                                  custom_gcs_temp_location=TEMP)

api2bq = (
    pipeline
    | 'Read last seen datetime' >> beam.io.ReadFromText(DATETIME)
    | 'EasyLog Cloud API call' >> beam.Map(api_call)
    | 'Write to BigQuery' >> beam.ParDo(WriteToBigQuery())
)

pipeline.run()

I cannot understand why it will not batch load data to my Big Query table. The schema is correct so I have ruled that out as well. Perhaps, I am handling the logic that resolves the return of None in a manner that is incorrect?

Please help me understand what I am doing wrong.

Furthermore, when I run the modification that does not include the logic then I get a different error:

api2bq = (
    pipeline
    | 'Read last seen datetime' >> beam.io.ReadFromText(DATETIME)
    | 'EasyLog Cloud API call' >> beam.Map(api_call)
    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(TABLE,schema=SCHEMA,
                                                      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                      custom_gcs_temp_location=TEMP)
)

The new error is:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_460_6b53b91f5dd19adf8697c0ca6de4564c_08955518ebba4e399d41cf49592523f6 failed. Error Result: <ErrorProto
 location: 'gs://api-test-321/temp_multi/bq_load/1418752d549c42b3a935ecbbecbdde94/tensile-proxy-386313.dataflow.another-table/5324c897-225d-47b2-8520-15c3af69929c'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://api-test-321/temp_multi/bq_load/1418752d549c42b3a935ecbbecbdde94/tensile-proxy-386313.dataflow.another-table/5324c897-225d-47b2-8520-15c3af69929c'
 reason: 'invalid'> [while running '[9]: Write to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']

Solution

  • A DoFn must return an iterator, e.g. you should use yield instead of return in process(). However, a DoFn is not the place to outsource a part of your pipeline (see for example in this SO post). For this, you should use a PTransform.

    Furthermore, WriteToBigQuery (usually) returns nothing, resulting in your original error. In addition, WriteToBigQuery expects as input single rows in form of dictionaries.

    If you simply want to write your result into BigQuery, I would recommend changing your code as follows:

    def api_call(start_datetime):
        API = EasyLogCloudAPI(userGUID, APIToken)
        start_datetime = datetime.datetime.strptime(start_datetime,
                                                    DATETIME_FORMAT)
        end_datetime = datetime.datetime.today()
        histories = API.get_history(start_datetime=start_datetime,
                                  end_datetime=end_datetime,
                                  df=False)
        if histories is not None:
            for history in histories:
                return history
    
    class MyWriteToBigQuery(beam.PTransform):
        def expand(self, pcoll):
            return (
                pcoll
                | 'Write to BigQuery' >>  beam.io.WriteToBigQuery(
                     TABLE,
                     schema=SCHEMA,
                     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                     custom_gcs_temp_location=TEMP)
            )
    
    (
        pipeline
        | 'Read last seen datetime' >> beam.io.ReadFromText(DATETIME)
        | 'EasyLog Cloud API call' >> beam.Map(api_call)
        | 'Write to BigQuery' >> MyWriteToBigQuery()
    )
    

    Since MyWriteToBigQuery is identical to WriteToBigQuery you may skip the PTransform altogether and just use

    (
        pipeline
        | 'Read last seen datetime' >> beam.io.ReadFromText(DATETIME)
        | 'EasyLog Cloud API call' >> beam.Map(api_call)
        | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                 TABLE,
                 schema=SCHEMA,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                 custom_gcs_temp_location=TEMP)
    )