So I've got a very basic pipeline in Python using apache beam and google cloud to take data from Cloud Storage, remove some columns, and move it into BigQuery which works up until that final bit.
When using WriteToBigQuery(table = ...)
, I get the following error:
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
This is coming from it checking isinstance(table, TableReference)
as part of the parse_table_reference()
function. The weird thing is that if I manually check:
table = TableReference.from_string(...)
print(isinstance(table, TableReference))
Then it comes back as True just fine.
I've tried formatting the table reference in various ways and all of them get the same.
Is this an issue with the way that I'm providing the table reference?, or is there another way to do this and avoid this error?
Traceback
TypeError Traceback (most recent call last)
Input In [1], in <cell line: 37>()
38 options = PipelineOptions()
39 p = beam.Pipeline(options = options)
41 (
42 p
43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
44 | "Split" >> beam.ParDo(Split())
45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
47 table = 'other-lender-uploads-test:Lender_Data.Test_Lender',
48 schema = 'Date: STRING, Name: STRING',
49 write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
50 )
52 result = p.run()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
2081 self._dataset = dataset
2082 self._project = project
-> 2083 self.table_reference = bigquery_tools.parse_table_reference(
2084 table, dataset, project)
2085 self.create_disposition = BigQueryDisposition.validate_create(
2086 create_disposition)
2087 self.write_disposition = BigQueryDisposition.validate_write(
2088 write_disposition)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
212 def parse_table_reference(table, dataset=None, project=None):
213 """Parses a table reference into a (project, dataset, table) tuple.
214
215 Args:
(...)
237 format.
238 """
--> 240 if isinstance(table, TableReference):
241 return TableReference(
242 projectId=table.projectId,
243 datasetId=table.datasetId,
244 tableId=table.tableId)
245 elif callable(table):
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
Please install apache-beam[gcp] instead of plain apache-beam. try:
pip install apache-beam[gcp]