I'm struggling to annotate an extra parameter of a DoFn process
method, specifically a timestamp parameter.
Minimal example:
import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import TimestampTypes
class Do(beam.DoFn):
def process(
self,
element: int,
timestamp: TimestampTypes = beam.DoFn.TimestampParam,
) -> Iterable[TimestampedValue[int]]:
yield TimestampedValue(element, timestamp)
Note: TimestampTypes
has a type of Union[int, float, Timestamp]
This results in mypy stating that the parameter type is incorrect:
Incompatible default for argument "timestamp" (default has type "_DoFnParam", argument has type "Union[int, float, Timestamp]")
However, if I annotate the parameter as indicated, the resulting timestamp
type is then incorrect:
import apache_beam as beam
from apache_beam.transforms.core import _DoFnParam
from apache_beam.transforms.window import TimestampedValue
class Do(beam.DoFn):
def process(
self,
element: int,
timestamp: _DoFnParam = beam.DoFn.TimestampParam,
) -> Iterable[TimestampedValue[int]]:
yield TimestampedValue(element, timestamp)
Argument 2 to "TimestampedValue" has incompatible type "_DoFnParam"; expected "Union[int, float, Timestamp]"
Has anyone resolved this discrepancy successfully, or is this a limitation of type hinting in Beam that I should ignore checking for now?
As mentioned by @chepner and @user3412205 in the comments, this can be solved by:
timestamp
parameter as: Union[TimestampTypes, _DoFnParam]
process
method like:if not isinstance(timestamp, (int, float)):
raise TypeError()
The full minimal example that passes type checking (mypy==1.11.1
and apache-beam==2.58.0
) looks like:
from typing import Iterable, Union
import apache_beam as beam
from apache_beam.transforms.core import _DoFnParam
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import TimestampTypes
class Do(beam.DoFn):
def process(
self,
element: int,
timestamp: Union[TimestampTypes, _DoFnParam] = beam.DoFn.TimestampParam,
) -> Iterable[TimestampedValue[int]]:
if not isinstance(timestamp, (int, float)):
raise TypeError()
yield TimestampedValue(element, timestamp)
Posting the answer as community wiki for the benefit of the community that might encounter this use case in the future. Feel free to edit this answer for additional information.