pythongoogle-bigqueryetlgoogle-cloud-dataflowapache-beam

Getting an error : AttributeError: 'MySchemaClassName' object has no attribute "..type '' while working with apache_beam to_dataframe module


Hello am in the process of learning apache_beam.Below is a section of a script I have wrote that aims to pick streaming data from pub/sub subscription ,transform it and write the final results into BigQuery However am getting this error AttributeError: 'BmsSchema' object has no attribute 'element_type' [while running 'Transforming columns with pandas']

Below is my code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe
import os
import typing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "...........json"


# Define the schema
class BmsSchema(typing.NamedTuple):
    ident: str


beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)


# Do function for passing in pubsub message from the subscription
class ParsePubSubMessage(beam.DoFn):
    def process(self, message):
        import json
        # Creating the main_dict that has all the columns
        all_columns = ['ident']
        main_dict = dict(zip(all_columns, [None] * len(all_columns)))
        # Parse the JSON message
        record = json.loads(message.decode('utf-8'))
        main_dict.update(record)

        yield {
            all_columns[0]: main_dict[all_columns[0]]

        }


# Do function for formatting the respective columns using Pandas
class PandasTransform(beam.DoFn):
    def process(self, element):
        df = to_dataframe(element)
        # Convert DataFrame back to a list of dictionaries
        for _, row in df.iterrows():
            yield row.to_dict()


def run():
    # Define pipeline options
    options = PipelineOptions(
        project='dw.......',
        runner='DirectRunner',
        streaming=True,
        temp_location='gs://............',
        staging_location='gs://..........',
        region='europe..........',
        job_name='pipeline-dataflow-test'
    )

    # Set streaming mode
    options.view_as(StandardOptions).streaming = True

    # Pub/Sub subscription
    input_subscription = 'projects/......./subscriptions/...........'

    table_schema = {
        "fields": [
            {"name": "ident", "type": "STRING", "mode": "NULLABLE"}

        ]
    }

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read from Pub/Sub and parse the messages
        messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    | 'Transforming columns with pandas' >> beam.ParDo(PandasTransform())
                    )

        # Write to BigQuery with schema autodetect
        messages | 'Write to BigQuery' >> WriteToBigQuery(
            table='project.table_name',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            schema=table_schema,
            custom_gcs_temp_location='gs://........'
        )


if __name__ == '__main__':
    run() 

Solution

  • to_dataframe is expecting a PCollection[1], but you are passing a single element of your PCollection instead. Instead of calling df = to_dataframe(element) as part of PandasTransform, you can do something like:

    messages = (p
                        | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                        | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                        | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                        )
    df_messages = to_dataframe(messages)
    

    and then you can manipulate df_messages with dataframe operations. See https://beam.apache.org/documentation/dsls/dataframes/overview/ for more info.

    [1] https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe