I want to ask if someone can tell me, or even show me an example, of a dataflow job template, preferably in Python, in which I can:
I've tried delving into the docs of all 3 products but I found myself in a rabbit hole of undocumented APIs.
I tried using Apache Beam in Python in order to at least try to make such a pipeline work, using, for example, this definition:
with beam.Pipeline(options=pipeline_options) as p:
_ = (
p
| "Read from Pub/Sub"
>> beam.io.ReadFromPubSub(
subscription=pipeline_options.input_subscription
)
| "Parse JSON" >> beam.Map(json.loads)
| "Process message" >> beam.ParDo(ProcessMessage())
| "Writing row object to BigTable"
>> WriteToBigTable(
project_id=pipeline_options.bigtable_project,
instance_id=pipeline_options.bigtable_instance,
table_id=pipeline_options.bigtable_table,
)
)
I am not sure even the json.loads
works, and if it does, in what format does it even reach my "ProcessMessage" class, which I tried making generic regardless of what keys I put in, but it still fails on errors I can't understand:
class ProcessMessage(beam.DoFn):
def process(self, message):
from google.cloud.bigtable import row as row_
import datetime
bt_row = row_.DirectRow(row_key=message.get('id'))
for k, v in message.items():
bt_row.set_cell("default", k.encode(), str(v).encode(), datetime.datetime.now())
yield bt_row
It's very unclear how I transform my JSON message, which might not be flat, streaming from the Pub/Sub:
{
"id": "12345",
"somekey": "somevalue",
"somekey2": ["some other value"]
}
into a row in bigtable, where it dynamically transforms all the keys into columns. I know bigtable requires a unique row key, so I have an ID, but I have no idea how to specify it in the code.
Have you seen the dataflow cookbook examples on GitHub?
Below is a code showing a apache beam pipeline that reads a pub/sub subscription and write on bigtable, using your input as an example:
import logging
import apache_beam as beam
from apache_beam.io import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.core import DoFn
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import Cell
from apache_beam.io.gcp.bigtableio import WriteToBigTable
class ConvertToJson(beam.DoFn):
def process(self, element):
import json
yield json.loads(element)
class MakeBigtableRow(DoFn):
def process(self, element):
row = DirectRow(row_key=str(element['id']))
for key, value in element.items():
row.set_cell(
column_family_id='cf1',
column=key,
value=str(value)
)
yield row
def run():
class ReadPubSubOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--subscription",
required=True,
help="PubSub subscription to read.",
)
parser.add_argument(
"--project_id",
required=True,
help="Project ID"
)
parser.add_argument(
"--instance_id",
required=True,
help="Cloud Bigtable instance ID"
)
parser.add_argument(
"--table_id",
required=True,
help="Cloud Bigtable table ID"
)
options = ReadPubSubOptions(streaming=True)
with beam.Pipeline(options=options) as p:
(
p
| "Read PubSub subscription"
>> ReadFromPubSub(subscription=options.subscription)
| "Convert to JSON" >> beam.ParDo(ConvertToJson())
| 'Map to Bigtable Row' >> beam.ParDo(MakeBigtableRow())
| "Write to BigTable" >> WriteToBigTable(
project_id=options.project_id,
instance_id=options.instance_id,
table_id=options.table_id
)
| beam.Map(logging.info)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()