pythonapache-beambeam-sql

How to define nullable fields for SqlTransform


I'm using Beam SqlTransform in python, trying to define/pass nullable fields.

This code works just fine:

with beam.Pipeline(options=options) as p:
# ...

# Use beam.Row to create a schema-aware PCollection
| "Create beam Row" >> beam.Map(lambda x: beam.Row(
    user_id=int(x['user_id']),
    user_name=str(x['user_name])
    ))
| 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")

However, I am not able to create nullable fields with this approach.

Without the direct cast, I'm getting a decoding Field error.

user_id = json.get('user_id')

throws:

Failed to decode Schema due to an error decoding Field proto:

name: "user_id"
type {
  nullable: true
  logical_type {
    urn: "beam:logical:pythonsdk_any:v1"
  }
}

Without using beam.Row, any other object, throws a missing schema error.

Cannot call getSchema when there is no schema

What is the proper way to define nullable fields?


Solution

  • When working with Apache Beam's SqlTransform in Python, you need to properly define nullable fields in your schema. Here are the correct approaches:

    Option 1: Using beam.Row with Optional Types

    The most straightforward way is to use Python's Optional type hint with beam.Row:

    from typing import Optional
    
    with beam.Pipeline(options=options) as p:
        (p
         | "Create beam Row" >> beam.Map(lambda x: beam.Row(
             user_id=Optional[int](x.get('user_id')),  # This makes the field nullable
             user_name=Optional[str](x.get('user_name'))
         ))
         | 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
        )
    

    Option 2: Using None for Null Values

    Alternatively, you can explicitly pass None for null values:

    with beam.Pipeline(options=options) as p:
        (p
         | "Create beam Row" >> beam.Map(lambda x: beam.Row(
             user_id=int(x['user_id']) if x['user_id'] is not None else None,
             user_name=str(x['user_name']) if x['user_name'] is not None else None
         ))
         | 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
        )
    

    Option 3: Using Schema Definition For more complex schemas, you can define the schema explicitly:

    from apache_beam.typehints import RowTypeConstraint
    from apache_beam.typehints.schemas import Field, FieldType
    from apache_beam.typehints.schemas import LogicalType
    
    schema = {
        'user_id': FieldType(LogicalType('beam:logical:pythonsdk_any:v1'), nullable=True),
        'user_name': FieldType(LogicalType('beam:logical:pythonsdk_any:v1'), nullable=True)
    }
    
    with beam.Pipeline(options=options) as p:
        (p
         | "Create beam Row" >> beam.Map(lambda x: beam.Row(
             user_id=x.get('user_id'),
             user_name=x.get('user_name')
         )).with_output_types(RowTypeConstraint.from_fields(schema))
         | 'SQL' >> SqlTransform("SELECT user_id, COUNT(*) AS msg_count FROM PCOLLECTION GROUP BY user_id")
        )