I'm using Beam SqlTransform in python, trying to define/pass nullable fields.
This code works just fine:
with beam.Pipeline(options=options) as p:
# ...
# Use beam.Row to create a schema-aware PCollection
| "Create beam Row" >> beam.Map(lambda x: beam.Row(
user_id=int(x['user_id']),
user_name=str(x['user_name])
))
| 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
However, I am not able to create nullable fields with this approach.
Without the direct cast, I'm getting a decoding Field error.
user_id = json.get('user_id')
throws:
Failed to decode Schema due to an error decoding Field proto:
name: "user_id"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
Without using beam.Row, any other object, throws a missing schema error.
Cannot call getSchema when there is no schema
What is the proper way to define nullable fields?
When working with Apache Beam's SqlTransform
in Python, you need to properly define nullable fields in your schema. Here are the correct approaches:
Option 1: Using beam.Row
with Optional Types
The most straightforward way is to use Python's Optional type hint with beam.Row
:
from typing import Optional
with beam.Pipeline(options=options) as p:
(p
| "Create beam Row" >> beam.Map(lambda x: beam.Row(
user_id=Optional[int](x.get('user_id')), # This makes the field nullable
user_name=Optional[str](x.get('user_name'))
))
| 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
)
Option 2: Using None
for Null Values
Alternatively, you can explicitly pass None
for null values:
with beam.Pipeline(options=options) as p:
(p
| "Create beam Row" >> beam.Map(lambda x: beam.Row(
user_id=int(x['user_id']) if x['user_id'] is not None else None,
user_name=str(x['user_name']) if x['user_name'] is not None else None
))
| 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
)
Option 3: Using Schema Definition For more complex schemas, you can define the schema explicitly:
from apache_beam.typehints import RowTypeConstraint
from apache_beam.typehints.schemas import Field, FieldType
from apache_beam.typehints.schemas import LogicalType
schema = {
'user_id': FieldType(LogicalType('beam:logical:pythonsdk_any:v1'), nullable=True),
'user_name': FieldType(LogicalType('beam:logical:pythonsdk_any:v1'), nullable=True)
}
with beam.Pipeline(options=options) as p:
(p
| "Create beam Row" >> beam.Map(lambda x: beam.Row(
user_id=x.get('user_id'),
user_name=x.get('user_name')
)).with_output_types(RowTypeConstraint.from_fields(schema))
| 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
)