I am trying to run below program using Apache Beam on GCP Dataflow. The program should read CSV file, do some transformation like sum, max and join. Then write to BQ table. Till step 4 I am getting expected result. But at step 5 the job is failing to write to BigQuery with below error.
Please help to solve it.
Data:
Error:
TypeError: 'tuple' object does not support item assignment [while running 'Write to BQ/StorageWriteToBigQuery/Convert dict to Beam Row/Convert dict to Beam Row']
Code:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
from pprint import pprint
parser = argparse.ArgumentParser()
parser.add_argument("--in_file", dest="in_file", required=True, help="input GCS file")
parser.add_argument("--out_table", dest="out_table", required=True, help="output BQ Table")
parser.add_argument("--out_ds", dest="out_ds", required=True, help="output BQ DS")
path_args, pipeline_args = parser.parse_known_args()
input_file = path_args.in_file
output_table = path_args.out_table
output_ds = path_args.out_ds
output_schema = {
"fields": [
{"name": "team", "type": "string", "mode": "NULLABLE"},
{"name": "total_goals", "type": "integer", "mode": "NULLABLE"},
{"name": "key_player", "type": "string", "mode": "NULLABLE"},
{"name": "player_goals", "type": "integer", "mode": "NULLABLE"},
]
}
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
# step 1
input_stats = (
pipeline
| "Read File" >> beam.io.ReadFromText(file_pattern=input_file, skip_header_lines=1)
| "Split Line" >> beam.Map(lambda line: line.split(","))
)
# step 2
country_tot_goals = (
input_stats
| "Country Goals Key Value Pair" >> beam.Map(lambda rec: (rec[1], int(rec[2])))
| "CombinePerKey Country Total Goals" >> beam.CombinePerKey(sum)
)
# step 3
country_max_goals = (
input_stats
| "Country Player Goals Key Value Pairs" >> beam.Map(lambda rec: (rec[1], [rec[0], rec[3]]))
| "CombinePerKey Country Key Goals" >> beam.GroupByKey()
| "Get key player with max goals" >> beam.Map(lambda cnt_ply: (cnt_ply[0], max(cnt_ply[1], key=lambda ply: int(ply[1]))))
)
# step 4
def join_lists(items):
team, data = items
total_goals = data['country_data'][0]
player_list = data['player_data'][0]
key_player = player_list[0]
player_goals = int(player_list[1])
return team, total_goals, key_player, player_goals
team_out = (
{"country_data": country_tot_goals, "player_data": country_max_goals}
| "Join based on Key" >> beam.CoGroupByKey()
| "Map on Key" >> beam.Map(join_lists)
# | "Print Team Out" >> beam.Map(pprint)
)
"""
('Argentina', 1100, 'Messi', 250)
('India', 300, 'Chhetri', 200)
('Portugal', 500, 'Ronaldo', 200)
('Brazil', 1000, 'Pele', 220)
"""
# step 5
write_out = (
team_out
| "Write to BQ" >> beam.io.WriteToBigQuery(table=output_table, dataset=output_ds, schema=output_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
)
The input format of your data is not what WriteToBigQuery
is expecting. Quoting from the documentation:
This transform receives a PCollection of elements to be inserted into BigQuery tables. The elements would come in as Python dictionaries, or as TableRow instances.
while you are providing a bunch of tuples. You need to modify your join_lists
method to something like this
def join_lists(items):
team, data = items
total_goals = data['country_data'][0]
player_list = data['player_data'][0]
key_player = player_list[0]
player_goals = int(player_list[1])
return {
"team": team,
"total_goals": total_goals,
"key_player": key_player,
"player_goals": player_goals
}