google-cloud-dataflowapache-beamgoogle-cloud-pubsubgoogle-cloud-bigtablebigtable

How to stream data from Pub/Sub to Google BigTable using DataFlow?


I want to ask if someone can tell me, or even show me an example, of a dataflow job template, preferably in Python, in which I can:

  1. Continuously read JSON data from a Pub/Sub topic
  2. Process this data / Enrich it with custom logic
  3. Load the data into an existing BigTable table

I've tried delving into the docs of all 3 products but I found myself in a rabbit hole of undocumented APIs.

I tried using Apache Beam in Python in order to at least try to make such a pipeline work, using, for example, this definition:

with beam.Pipeline(options=pipeline_options) as p:
    _ = (
        p
        | "Read from Pub/Sub"
        >> beam.io.ReadFromPubSub(
            subscription=pipeline_options.input_subscription
        )
        | "Parse JSON" >> beam.Map(json.loads)
        | "Process message" >> beam.ParDo(ProcessMessage())
        | "Writing row object to BigTable"
        >> WriteToBigTable(
            project_id=pipeline_options.bigtable_project,
            instance_id=pipeline_options.bigtable_instance,
            table_id=pipeline_options.bigtable_table,
        )
    )

I am not sure even the json.loads works, and if it does, in what format does it even reach my "ProcessMessage" class, which I tried making generic regardless of what keys I put in, but it still fails on errors I can't understand:

class ProcessMessage(beam.DoFn):

    def process(self, message):
        from google.cloud.bigtable import row as row_
        import datetime         
        bt_row = row_.DirectRow(row_key=message.get('id'))
        for k, v in message.items():
            bt_row.set_cell("default", k.encode(), str(v).encode(), datetime.datetime.now())
        yield bt_row

It's very unclear how I transform my JSON message, which might not be flat, streaming from the Pub/Sub:

{
   "id": "12345",
   "somekey": "somevalue",
   "somekey2": ["some other value"]
}

into a row in bigtable, where it dynamically transforms all the keys into columns. I know bigtable requires a unique row key, so I have an ID, but I have no idea how to specify it in the code.


Solution

  • Have you seen the dataflow cookbook examples on GitHub?

    Below is a code showing a apache beam pipeline that reads a pub/sub subscription and write on bigtable, using your input as an example:

        import logging
        
        import apache_beam as beam
        from apache_beam.io import ReadFromPubSub
        from apache_beam.options.pipeline_options import PipelineOptions
        from apache_beam.transforms.core import DoFn
        from google.cloud.bigtable.row import DirectRow
        from google.cloud.bigtable.row_data import Cell
        from apache_beam.io.gcp.bigtableio import WriteToBigTable
        
        class ConvertToJson(beam.DoFn):
            def process(self, element):
                import json
                yield json.loads(element)
        
        class MakeBigtableRow(DoFn):
            def process(self, element):
                row = DirectRow(row_key=str(element['id']))
                for key, value in element.items():
                    row.set_cell(
                        column_family_id='cf1',
                        column=key,
                        value=str(value)
                    )
                yield row
        
        def run():
            class ReadPubSubOptions(PipelineOptions):
                @classmethod
                def _add_argparse_args(cls, parser):
                    parser.add_argument(
                        "--subscription",
                        required=True,
                        help="PubSub subscription to read.",
                    )
                    parser.add_argument(
                        "--project_id",
                        required=True,
                        help="Project ID"
                    )
                    parser.add_argument(
                        "--instance_id",
                        required=True,
                        help="Cloud Bigtable instance ID"
                    )
                    parser.add_argument(
                        "--table_id",
                        required=True,
                        help="Cloud Bigtable table ID"
                    )
            options = ReadPubSubOptions(streaming=True)
        
            with beam.Pipeline(options=options) as p:
                (
                    p
                    | "Read PubSub subscription"
                    >> ReadFromPubSub(subscription=options.subscription)
                    | "Convert to JSON" >> beam.ParDo(ConvertToJson())
                    | 'Map to Bigtable Row' >> beam.ParDo(MakeBigtableRow())
                    | "Write to BigTable" >> WriteToBigTable(
                        project_id=options.project_id,
                        instance_id=options.instance_id,
                        table_id=options.table_id
                    )
                    | beam.Map(logging.info)
                )
        
        if __name__ == "__main__":
            logging.getLogger().setLevel(logging.INFO)
            run()
    

    print of bigtable UI