hazelcast-jet

Hazelcast Jet - drain the list to a stream


My Jet job is transforming the Redis stream data - the tranformation is - I lookup a map for every item in the stream - if found it contains one or more items (list). I would like to write the items in draining stream as separate items and not as one element of list.

My code works, however it writes the list as a single item to another Redis stream - what I need is write each element of the list separately to the stream (so that the other job can work on items independently).

Code

pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
        .withIngestionTimestamps()
        .groupingKey(k -> k.get("eventType"))
        .mapUsingContext(lookupService(), (svc, event, item) -> svc.findHooks(event) /*returns list*/) 
        .drainTo(RedisSinks.stream("drain", uri, "hooks"));

So, the returned list from the service should be written as separate elements in the output stream.

What api can I use to emit each item? I couldn't find much in docs.


Solution

  • To map one item into multiple items, you need to use the the flat map transform instead of the simple map transform.

    Example below:

    pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
            .withIngestionTimestamps()
            .groupingKey(k -> k.get("eventType"))
            .flatMapUsingContext(lookupService(), (svc, event, item) -> Traversers.traverseIterable(svc.findHooks(event)) /*returns list*/) 
            .drainTo(RedisSinks.stream("drain", uri, "hooks"));