python-3.xgoogle-bigquerygoogle-cloud-functionsbq

Loading multiple csv into BQ table on event trigger


i am trying to load multiple csv files into table by below code but it is failing: Can anyone let me know where i am wrong: ##################

def csv_loader(data, context):
        client = bigquery.Client()
        dataset_id = os.environ['DATASET']
        dataset_ref = client.dataset(dataset_id)
        job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter="|",
        write_disposition="WRITE_TRUNCATE",
        skip_leading_rows=1,
         )

        # get the URI for uploaded CSV in GCS from 'data'
        uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
        # lets do this
        load_job = client.load_table_from_uri(
                uri,
                dataset_ref.table(os.environ['TABLE'])

        load_job.result()  # wait for table load to complete.
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
        print('Loaded {} rows.'.format(destination_table.num_rows))

###################

Above works fine if schema is mentioned for one file but with multiple files gives error.Can anyone tell me what I am doing wrong


Solution

  • Based on your last comment, schemas are different and you want to use schema auto detect. However, I don't see the flag in your code and neither see you passing the job_config variable in the job load method.

    Try something as below:

    Note: I added the flag autodetect=True, in the job_config variable and also passed the job_config variable in the load_table_from_uri() function.

    def csv_loader(data, context):
            client = bigquery.Client()
            dataset_id = os.environ['DATASET']
            dataset_ref = client.dataset(dataset_id)
            job_config = bigquery.LoadJobConfig(
            autodetect=True,
            source_format=bigquery.SourceFormat.CSV,
            field_delimiter="|",
            write_disposition="WRITE_TRUNCATE",
            skip_leading_rows=1,
             )
    
            # get the URI for uploaded CSV in GCS from 'data'
            uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
            # lets do this
            load_job = client.load_table_from_uri(
                    uri,
                    dataset_ref.table(os.environ['TABLE'],
                    job_config=job_config
                   )
    
            load_job.result()  # wait for table load to complete.
            print('Job finished.')
            destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
            print('Loaded {} rows.'.format(destination_table.num_rows))