google-cloud-platformgoogle-bigquerygoogle-cloud-dataflowapache-beamapache-beam-io

How to write data from apache beam using gcp dataflow to bigquery table?


I am trying to run below program using Apache Beam on GCP Dataflow. The program should read CSV file, do some transformation like sum, max and join. Then write to BQ table. Till step 4 I am getting expected result. But at step 5 the job is failing to write to BigQuery with below error.

Please help to solve it.

Data:

enter image description here

Error:

TypeError: 'tuple' object does not support item assignment [while running 'Write to BQ/StorageWriteToBigQuery/Convert dict to Beam Row/Convert dict to Beam Row']

Code:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
from pprint import pprint

parser = argparse.ArgumentParser()
parser.add_argument("--in_file", dest="in_file", required=True, help="input GCS file")
parser.add_argument("--out_table", dest="out_table", required=True, help="output BQ Table")
parser.add_argument("--out_ds", dest="out_ds", required=True, help="output BQ DS")
path_args, pipeline_args = parser.parse_known_args()

input_file = path_args.in_file
output_table = path_args.out_table
output_ds = path_args.out_ds
output_schema = {
    "fields": [
        {"name": "team", "type": "string", "mode": "NULLABLE"},
        {"name": "total_goals", "type": "integer", "mode": "NULLABLE"},
        {"name": "key_player", "type": "string", "mode": "NULLABLE"},
        {"name": "player_goals", "type": "integer", "mode": "NULLABLE"},
    ]
}

pipeline_options = PipelineOptions(pipeline_args)

with beam.Pipeline(options=pipeline_options) as pipeline:
    # step 1
    input_stats = (
        pipeline
        | "Read File" >> beam.io.ReadFromText(file_pattern=input_file, skip_header_lines=1)
        | "Split Line" >> beam.Map(lambda line: line.split(","))
    )

    # step 2
    country_tot_goals = (
        input_stats
        | "Country Goals Key Value Pair" >> beam.Map(lambda rec: (rec[1], int(rec[2])))
        | "CombinePerKey Country Total Goals" >> beam.CombinePerKey(sum)
    )

    # step 3
    country_max_goals = (
        input_stats
        | "Country Player Goals Key Value Pairs" >> beam.Map(lambda rec: (rec[1], [rec[0], rec[3]]))
        | "CombinePerKey Country Key Goals" >> beam.GroupByKey()
        | "Get key player with max goals" >> beam.Map(lambda cnt_ply: (cnt_ply[0], max(cnt_ply[1], key=lambda ply: int(ply[1]))))
    )

    # step 4
    def join_lists(items):
        team, data = items
        total_goals = data['country_data'][0]
        player_list = data['player_data'][0]
        key_player = player_list[0]
        player_goals = int(player_list[1])
        return team, total_goals, key_player, player_goals

    team_out = (
        {"country_data": country_tot_goals, "player_data": country_max_goals}
        | "Join based on Key" >> beam.CoGroupByKey()
        | "Map on Key" >> beam.Map(join_lists)
        # | "Print Team Out" >> beam.Map(pprint)
    )

    """
    ('Argentina', 1100, 'Messi', 250)
    ('India', 300, 'Chhetri', 200)
    ('Portugal', 500, 'Ronaldo', 200)
    ('Brazil', 1000, 'Pele', 220)
    """

    # step 5
    write_out = (
        team_out
        | "Write to BQ" >> beam.io.WriteToBigQuery(table=output_table, dataset=output_ds, schema=output_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
    )

Solution

  • The input format of your data is not what WriteToBigQuery is expecting. Quoting from the documentation:

    This transform receives a PCollection of elements to be inserted into BigQuery tables. The elements would come in as Python dictionaries, or as TableRow instances.

    while you are providing a bunch of tuples. You need to modify your join_lists method to something like this

    def join_lists(items):
        team, data = items
        total_goals = data['country_data'][0]
        player_list = data['player_data'][0]
        key_player = player_list[0]
        player_goals = int(player_list[1])
    
        return {
          "team": team,
          "total_goals": total_goals,
          "key_player": key_player,
          "player_goals": player_goals
        }