pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beam

How to avoid TypeError when using Python ApacheBeam for DataFlow?


So I've got a very basic pipeline in Python using apache beam and google cloud to take data from Cloud Storage, remove some columns, and move it into BigQuery which works up until that final bit.

When using WriteToBigQuery(table = ...), I get the following error:

TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

This is coming from it checking isinstance(table, TableReference) as part of the parse_table_reference() function. The weird thing is that if I manually check:

table = TableReference.from_string(...)
print(isinstance(table, TableReference))

Then it comes back as True just fine.

I've tried formatting the table reference in various ways and all of them get the same.

Is this an issue with the way that I'm providing the table reference?, or is there another way to do this and avoid this error?


Traceback

    TypeError                                 Traceback (most recent call last)
    Input In [1], in <cell line: 37>()
         38 options = PipelineOptions()
         39 p = beam.Pipeline(options = options)
         41 (
         42 p 
         43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
         44 | "Split" >> beam.ParDo(Split())
         45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
    ---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
         47     table = 'other-lender-uploads-test:Lender_Data.Test_Lender', 
         48     schema = 'Date: STRING, Name: STRING', 
         49     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
         50 )
         52 result = p.run()
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
       2081 self._dataset = dataset
       2082 self._project = project
    -> 2083 self.table_reference = bigquery_tools.parse_table_reference(
       2084     table, dataset, project)
       2085 self.create_disposition = BigQueryDisposition.validate_create(
       2086     create_disposition)
       2087 self.write_disposition = BigQueryDisposition.validate_write(
       2088     write_disposition)
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
        212 def parse_table_reference(table, dataset=None, project=None):
        213   """Parses a table reference into a (project, dataset, table) tuple.
        214 
        215   Args:
       (...)
        237       format.
        238   """
    --> 240   if isinstance(table, TableReference):
        241     return TableReference(
        242         projectId=table.projectId,
        243         datasetId=table.datasetId,
        244         tableId=table.tableId)
        245   elif callable(table):
    
    TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

Solution

  • Please install apache-beam[gcp] instead of plain apache-beam. try:

    pip install apache-beam[gcp]