My Jet job is transforming the Redis stream data - the tranformation is - I lookup a map
for every item in the stream - if found it contains one or more items (list). I would like to write the items in draining stream as separate items and not as one element of list.
My code works, however it writes the list as a single item to another Redis stream - what I need is write each element of the list separately to the stream (so that the other job can work on items independently).
Code
pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
.withIngestionTimestamps()
.groupingKey(k -> k.get("eventType"))
.mapUsingContext(lookupService(), (svc, event, item) -> svc.findHooks(event) /*returns list*/)
.drainTo(RedisSinks.stream("drain", uri, "hooks"));
So, the returned list from the service should be written as separate elements in the output stream.
What api can I use to emit each item? I couldn't find much in docs.
To map one item into multiple items, you need to use the the flat map transform instead of the simple map transform.
Example below:
pipeline.drawFrom(RedisSources.stream("source", uri, "payloads", "$"))
.withIngestionTimestamps()
.groupingKey(k -> k.get("eventType"))
.flatMapUsingContext(lookupService(), (svc, event, item) -> Traversers.traverseIterable(svc.findHooks(event)) /*returns list*/)
.drainTo(RedisSinks.stream("drain", uri, "hooks"));