javaredisstreamjedisredisjson

Redis streams Data missing


I am using Redis with the Jedis client , I have two classes , a Producer one and a Consumer one .The data sent by the producer class over streams is not being recieved well by the Consumer class . It seems there is loss of data but I do not know the source or how to mitigate this problem . My Streams producer code snippet :

    Map<String, String> messageBody  = new HashMap<>();
        for(String basekey : args){
            Set<String> prices =  client.keys("RTSH:"+basekey +"*");
            for(String price_key : prices){
                messageBody.put("json_key", price_key);
                client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
                System.out.println(messageBody);
            }
        }

My streams consumer snippet :

HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
        PooledConnectionProvider provider = new PooledConnectionProvider(config);
        UnifiedJedis client = new UnifiedJedis(provider);
        XReadGroupParams xReadGroupParams = new XReadGroupParams();
        xReadGroupParams.block(0);
        xReadGroupParams.count(1);
        xReadGroupParams.noAck();
        try {
            client.xgroupCreate(STREAMS_KEY, "RTSH_consumers",StreamEntryID.LAST_ENTRY,true);
        }catch(Exception redisBusyException) {
            System.out.println( String.format("\t Group '%s' already exists", "application_1"));
        }
        Map<String, StreamEntryID> stream =  new HashMap<>();
        stream.put(STREAMS_KEY,StreamEntryID.UNRECEIVED_ENTRY);
        while(true) {
            List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
            if (!messages.isEmpty()) {
                for (Map.Entry<String, List<StreamEntry>> entry : messages) {
                    String key = entry.getKey();//key is the stream name
                    List<StreamEntry> value = entry.getValue();
                    for (StreamEntry StreamEntry : value) {
                        String json_key = StreamEntry.getFields().get("json_key");
                        Object prices =  client.jsonGet(json_key);
                        System.out.println(prices);
                        System.out.println("\n");
                        client.xack(STREAMS_KEY, "RTSH_consumers", StreamEntry.getID());
                    }
                }
            }
        }

I tried fetching less data but the problem persists.


Solution

  • In xgroupCreate method, send new StreamEntryID() instead of StreamEntryID.LAST_ENTRY.