wso2siddhiwso2-streaming-integrator

WSO2 Streaming Integrator - Siddhi.io Aggregate multiple events into one event


I have a requirement to collect multiple events into a single one and consider it as a single event. The output stream should include the input events as a list.

Basically, when the following events are passed

{InvoiceNumber:1, LineItem: 1, Value: 100}
{InvoiceNumber:1, LineItem: 2, Value: 200}
{InvoiceNumber:1, LineItem: 3, Value: 300}

I need the output to be like the following

[InvoiceNumber:1, lineItems: [{LineItem: 1, Value: 100}, {LineItem: 2, Value: 200}, {LineItem: 3, Value: 300}]

How do I achieve this with WSO2 Streaming Integrator? Or Siddhi.io.

I attempted the following but it still inserting each stream into the output stream

partition with (InvoiceNo of CSVInputStream)
begin

    from every e1=CSVInputStream
        within 1 min
    select e1.InvoiceNo, list:collect(e1.LineItem) as lineItems
    group by e1.InvoiceNo
    insert into AggregatedStream;
end;

Solution

  • Do not use partitions, as this is a simple use case, try windows. Time batch windows in your case, https://siddhi.io/en/v5.1/docs/api/latest/#timebatch-window

    from CSVInputStream#window.timeBatch(1 min)
    select e1.InvoiceNo, list:collect(e1.LineItem) as lineItems
    group by e1.InvoiceNo
    insert into AggregatedStream;