I'm trying to join 2 unbounded sources using Apache Beam Java SDK. While Joining Im getting the below error message.
Exception in thread "main" java.lang.UnsupportedOperationException: Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), accumulationMode=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST} is not supported at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.access$1500(BeamJoinRel.java:98) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:330) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:308) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:67) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:48) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:49) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:65) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:100) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:167) at xyz.xyz.main(xyz.java:64)
I have tried using both Fixed & Sliding Window along with triggering (pastEndOfWindow & pastFirstElementInPane) with zero allowed lateness. Tried both Accumalate & Discard fired panes. I get the same error message everytime.
Below are 2 snippets i tried using both fixed & sliding window.
p1.apply("window",
Window
.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
p1.apply("window2",
Window.<Row>into(
SlidingWindows
.of(Duration.standardSeconds(30))
.every(Duration.standardSeconds(5)))
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
I simply wanted to implement a sql transform with a sliding window, Trigger with delay and allow lateness. Kindly guide me through to implement it.
Thanks, Gowtham
From the comment, if I understand it correctly, the desired behavior is:
Basically it's kind of continuous sliding matching of the last 30 min of data in both streams, and results are emitted every 30 seconds.
Good news is that it should be possible to implement in Beam Java (probably in Python as well). Bad news it would probably be non-trivial in Java and I don't think it's possible at all in SQL at the moment.
What it would probably look like:
ParDo
(or this) which keeps track of all seen elements by storing them in a state cell:
CoGroupByKey
beforehand to have access to elements from both inputs in the same ParDo
;CoGroupByKey
have different semantics and might not be easy to work with;I suggest you read through this example, it does the timer and state part of what you need (it waits for matching records, keeps the unmatched records in the state, and clears state on timer firing) and uses a CoGroupByKey
. You might have a better idea of how it works after you understand this example.