I am using Redis with the Jedis client , I have two classes , a Producer one and a Consumer one .The data sent by the producer class over streams is not being recieved well by the Consumer class . It seems there is loss of data but I do not know the source or how to mitigate this problem . My Streams producer code snippet :
Map<String, String> messageBody = new HashMap<>();
for(String basekey : args){
Set<String> prices = client.keys("RTSH:"+basekey +"*");
for(String price_key : prices){
messageBody.put("json_key", price_key);
client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
System.out.println(messageBody);
}
}
My streams consumer snippet :
HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
PooledConnectionProvider provider = new PooledConnectionProvider(config);
UnifiedJedis client = new UnifiedJedis(provider);
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.block(0);
xReadGroupParams.count(1);
xReadGroupParams.noAck();
try {
client.xgroupCreate(STREAMS_KEY, "RTSH_consumers",StreamEntryID.LAST_ENTRY,true);
}catch(Exception redisBusyException) {
System.out.println( String.format("\t Group '%s' already exists", "application_1"));
}
Map<String, StreamEntryID> stream = new HashMap<>();
stream.put(STREAMS_KEY,StreamEntryID.UNRECEIVED_ENTRY);
while(true) {
List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
if (!messages.isEmpty()) {
for (Map.Entry<String, List<StreamEntry>> entry : messages) {
String key = entry.getKey();//key is the stream name
List<StreamEntry> value = entry.getValue();
for (StreamEntry StreamEntry : value) {
String json_key = StreamEntry.getFields().get("json_key");
Object prices = client.jsonGet(json_key);
System.out.println(prices);
System.out.println("\n");
client.xack(STREAMS_KEY, "RTSH_consumers", StreamEntry.getID());
}
}
}
}
I tried fetching less data but the problem persists.
In xgroupCreate
method, send new StreamEntryID()
instead of StreamEntryID.LAST_ENTRY.