I have a requirement to collect multiple events into a single one and consider it as a single event. The output stream should include the input events as a list.
Basically, when the following events are passed
{InvoiceNumber:1, LineItem: 1, Value: 100}
{InvoiceNumber:1, LineItem: 2, Value: 200}
{InvoiceNumber:1, LineItem: 3, Value: 300}
I need the output to be like the following
[InvoiceNumber:1, lineItems: [{LineItem: 1, Value: 100}, {LineItem: 2, Value: 200}, {LineItem: 3, Value: 300}]
How do I achieve this with WSO2 Streaming Integrator? Or Siddhi.io.
I attempted the following but it still inserting each stream into the output stream
partition with (InvoiceNo of CSVInputStream)
begin
from every e1=CSVInputStream
within 1 min
select e1.InvoiceNo, list:collect(e1.LineItem) as lineItems
group by e1.InvoiceNo
insert into AggregatedStream;
end;
Do not use partitions, as this is a simple use case, try windows. Time batch windows in your case, https://siddhi.io/en/v5.1/docs/api/latest/#timebatch-window
from CSVInputStream#window.timeBatch(1 min)
select e1.InvoiceNo, list:collect(e1.LineItem) as lineItems
group by e1.InvoiceNo
insert into AggregatedStream;