I have a need to batch the events in a Faust stream, so I'm using the take()
method. However, I would also like to access the header message, particularly a timestamp.
Ordinarily you would access the header using:
async for event in stream.events()
and then call the header using event.header
, but since we're using the take method:
async for event in stream.take(500, 1)
we can't seem to get access to the raw message. Any idea on how to get this? We're just trying to have a means of highlighting a slow section of the pipeline by monitoring timestamps of each as a header, rather than adding it as a part of the value
part of the sent message.
Is there another raw timestamp that's 'hidden' but accessible that I've missed?
EDIT
Using faust-streaming==0.8.4 so it's definitely up to date
The Event and Message objects have headers as attributes that can be accessed in a stream. Both events
and take
utilize EventT
objects so you should be able to access them the same way. The only difference is that take
and its derivatives unpack EventT
objects into dicts inside a buffer list whereas events
yields an individual EventT
at a time. You can individually access EventT
objects if you set your take
buffer size to 1.
There's a function introduced in faust-streaming==0.7.7
called stream.take_with_timestamp
that's nearly identical to stream.take
that can be utilized via:
async for event in stream.take_with_timestamp(1, 1, 'timestamp'):
print(stream.current_event)
if stream.current_event is not None:
print(stream.current_event.headers)
print(stream.current_event.message.headers)
which will show you the timestamp of each event
. The caveat here is that if you set the buffer set to anything >1 and your stream times out, your stream.current_event
object will be None
.
Or you could just mimic the assignments inside take_with_timestamp
and access event.message.timestamp
inside stream.events()
:
async for event in stream.events():
print(event.message.timestamp)
print(event.headers)
print(event.message.headers)