in get_csv_reader NameError: name 'beam' is not defined [while running 'Flatten the CSV-ptransform-73']
I am trying to read a CSV on apache beam and load it to BQ. I used the first line in the CSV (headers) to convert all the rows into a dictionary before loading it to BQ.
below is my pipeline code
#pipeline
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
"--output_table",
help="Output BigQuery table for results specified as: "
"PROJECT:DATASET.TABLE or DATASET.TABLE.",
)
parser.add_argument(
"--input_file",
help="file location of input data "
'"GCS path"',
)
known_args, pipeline_args = parser.parse_known_args(argv)
input_file = known_args.input_file
output_table = known_args.output_table
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# headers = ['id', '_title12764THE_', 'type', 'description', 'release_year', 'age_certification', 'runtime', 'genres', 'production_countries', 'seasons', 'imdb_id', 'imdb_score', 'imdb_votes', 'tmdb_popularity', 'tmdb_score']
headers= read_headers(input_file)
with beam.Pipeline(options=pipeline_options) as p:
# Create Pipeline (PCollections)
parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))
convert = flattened_file | "bq convert" >> beam.Map(lambda x: parse_csv(x,headers))
convert | 'Write to bq' >> beam.io.WriteToBigQuery(
output_table,
schema='SCHEMA_AUTODETECT',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
Method to read the headers:
#reading headers
def read_headers(csv_file):
fs = gcsfs.GCSFileSystem(project='hidden-mapper-351214')
with fs.open(csv_file,'r') as f:
header_line = f.readline().strip()
headers = next(csv.reader([header_line]))
# BQ Column name requirements
BQ_headers=[]
for head in headers:
BQ_headers.append(re.sub('\W+', '', head.lstrip('0123456789')))
return BQ_headers
Method to read CSV into P-collection:
def get_csv_reader(readable_file):
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read file as a CSV
gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))
# next(gcs_reader)
return gcs_reader
When I run the code I get the following error:
File "/home/akhil_kakumanu/ingest-demo/ingest-csv.py", line 32, in get_csv_reader
NameError: name 'beam' is not defined [while running 'Flatten the CSV-ptransform-73']
my line 32 is this
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
My code was initially working fine when I hard-coded the headers in a list. But when I use GCSFS, it started throwing that error. I think it has to do something with opening the same file with two different libraries but not sure. I have installed beam SDK and GCSFS too. Like, I said it worked fine when I hardcoded the headers.
Can anyone tell me why it is happening and how to get around this? Also, if there is any other effective way to read any CSV and push it to BQ please suggest it.
Can you try adding
import apache_beam as beam
inside function def get_csv_reader
?
The issue of a NameError is usually that the worker doesn't know about values from global namespace. See https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors.