I want to deploy a streaming Dataflow job which is listening into a Pub/Sub topic.
The Pub/Sub message content looks like this:
"file_path": "gs://my_bucket_name/my_file.csv",
"transformations": [
"column_name": "NAME",
"transformation": "to_upper"
"column_name": "SURNAME",
"transformation": "to_lower"
My problem is that I would like to process the file specified by the message (file_path
) and apply the given
transformations for each column in the CSV file.
I have tried several ways to achieve this, but none of them worked and I am wondering if this is not possible at all or I am missing something.
class ProcessMessage(beam.DoFn):
def process(self, message):
from apache_beam.pvalue import TaggedOutput
file_path = message.get('file_path')
yield TaggedOutput('file_path', file_path)
except Exception as e:
raise Exception(e)
with beam.Pipeline(options=pipeline_options) as p:
file_path = (
p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=input_topic,timestamp_attribute='ts')
| "Parse JSON" >> beam.Map(json.loads)
| "Process Message" >> beam.ParDo(ProcessMessage).with_outputs('file_path')
file_content = (
| "Read file" >> beam.io.ReadFromText(file_path)
This fails with:
file_pattern must be of type string or ValueProvider; got <DoOutputsTuple main_tag=None tags=('file_path',) transform=<ParDo(PTransform) label=[ParDo(ProcessMessage)]> at 0x1441f9550> instead
class ReadFile(beam.DoFn):
def process(self, element):
import csv
import io as io_file
from apache_beam import io
file_path = element.get('file_path')
reader = csv.DictReader(io_file.TextIOWrapper(
for row in reader:
yield row
with beam.Pipeline(options=pipeline_options) as p:
message = (
p | "Read from Pubsub" >> beam.io.ReadFromPubSub(
| "Parse JSON" >> beam.Map(json.loads)
| "Process message" >> beam.ParDo(ProcessMessage())
file_content = (
| beam.ParDo(ReadFile())
| beam.Map(print)
This does not produce any error and neither prints the file lines.
I know this post is a bit on the long side, but I hope someone may help me,
First solution does not work because ReadFromText
takes as argument string, for example bucket path "gs://bucket/file".
In your example you insert into this class PCollection (result of previous PTransform) - so it will not work.
Instead, you should use ReadAllFromText
that takes as input PCollection, so it is the result of previous PTransform.
Also your code would need to be modified a bit:
If DoFn class returns only one type of output, there is no reason to use TaggedOutput so let's return just regular iterator.
class ProcessMessage(beam.DoFn):
def process(self, message):
file_path = message.get('file_path')
yield file_path
except Exception as e:
raise Exception(e)
Next, ReadAllFromText
should be connected to previous step of the pipeline, not to p
file_content = (
| "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=p.options.topic, timestamp_attribute='ts')
| "Parse JSON" >> beam.Map(json.loads)
| "Process Message" >> beam.ParDo(ProcessMessage())
| "Read file" >> beam.io.ReadAllFromText()
Be aware that file_content
variable will be Pcollection of elements, where each element will be single row of your CSV file in form of string.
Because of that it will be more complex to easily apply transformations per each column, because in first element will be columns names, next will be just single row without columns names applied.
Your second try seems to be better for this:
class ApplyTransforms(beam.DoFn):
def process(self, element):
file_path = element.get('file_path')
transformations = element.get('transformations')
with beam.io.gcsio.GcsIO().open(file_path) as file:
reader = csv.DictReader(io.TextIOWrapper(file, encoding="utf-8"), delimiter=';')
for row in reader:
for transform in transformations:
col_name = transform.get("column_name")
transformation = transform.get("transformation")
# apply your transform per row
yield row
Something like this could work, but probably better idea will be to split it into two classes - one for reading, another for applying transformations :)