Hello am in the process of learning apache_beam.Below is a section of a script I have wrote that aims to pick streaming data from pub/sub subscription ,transform it and write the final results into BigQuery However am getting this error AttributeError: 'BmsSchema' object has no attribute 'element_type' [while running 'Transforming columns with pandas']
Below is my code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe
import os
import typing
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "...........json"
# Define the schema
class BmsSchema(typing.NamedTuple):
ident: str
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
# Do function for passing in pubsub message from the subscription
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
# Creating the main_dict that has all the columns
all_columns = ['ident']
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
# Parse the JSON message
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
all_columns[0]: main_dict[all_columns[0]]
}
# Do function for formatting the respective columns using Pandas
class PandasTransform(beam.DoFn):
def process(self, element):
df = to_dataframe(element)
# Convert DataFrame back to a list of dictionaries
for _, row in df.iterrows():
yield row.to_dict()
def run():
# Define pipeline options
options = PipelineOptions(
project='dw.......',
runner='DirectRunner',
streaming=True,
temp_location='gs://............',
staging_location='gs://..........',
region='europe..........',
job_name='pipeline-dataflow-test'
)
# Set streaming mode
options.view_as(StandardOptions).streaming = True
# Pub/Sub subscription
input_subscription = 'projects/......./subscriptions/...........'
table_schema = {
"fields": [
{"name": "ident", "type": "STRING", "mode": "NULLABLE"}
]
}
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Read from Pub/Sub and parse the messages
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
| 'Transforming columns with pandas' >> beam.ParDo(PandasTransform())
)
# Write to BigQuery with schema autodetect
messages | 'Write to BigQuery' >> WriteToBigQuery(
table='project.table_name',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://........'
)
if __name__ == '__main__':
run()
to_dataframe is expecting a PCollection[1], but you are passing a single element of your PCollection instead. Instead of calling df = to_dataframe(element)
as part of PandasTransform
, you can do something like:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
df_messages = to_dataframe(messages)
and then you can manipulate df_messages with dataframe operations. See https://beam.apache.org/documentation/dsls/dataframes/overview/ for more info.