I am working on a Beam (Dataflow) pipeline, where the task is to read the messages from pubsub and then perform some transformations. In case there are some failures in any of these transformations I want to send message to the dead letter queue.
Flow looks something like this
// read from pubsub
PCollection<PubsubMessage> message = p.apply("Read", PubsubIO.readMessagesWithAttributes().fromSubscription('subscription_name'));
// Transformation1
// Transformation2
// Save
The message
is the input to transformation1 and the output of Transformation1
is input to Transformation2
.
I am able to send the original message
to dead letter queue if there is any exception in Transformation1
. The problem comes when I try to do the same in case Transformation2
fails.
My requirement is that I have to send the original message back even when Transformation2
fails.
Is there a way to do it?
in Transformation2
I tried using sideInput
, like this:
PCollectionView<List<PubsubMessage>> messageView = message.apply((View.asList())); <-- error
PCollection<TableRow> valid_table_rows = transformation1_Result.apply("Transformation2", ParDo.of(new transformation2()).withSideInputs(messageView));
But at runtime this gives me an error:
[2023-05-18, 16:17:52 EDT] {beam.py:127} WARNING - Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
The idea is that how can I send the original message to dead letter queue, regardless of which transformation fails.
Any help will be much appreciated.
It's not perfect but I think you have to transfer your initial message in each step and transformations.
You have to be carreful with Serialisation
if you are using SerializableCoder
, if you can't pass the PubSubMessage
in a Serialisable object.
You can create an intermediate object containing the same fields as PubSubMessage
:
public class InputMessage implements Serializable {
private ByteString key;
private ByteString data;
}
Then transfer this message in each transformation
Example :
class YourObject {
private InputMessage message;
// other fields.
}
pipeline
.apply("Map", MapElements.into(of(YourObject.class)).via(Class::yourTransform))
public YourObject yourTransform(final PubSubMessage message) {
// Apply your different transformations.
InputMessage message = new InputMessage(message.getKey(), message.getData())
yourObject.setMessage(message);
return yourObject;
}
Then in the catch bloc, side output and TupleTag
, you can use the input object.