javaredisjedisredis-cluster

How to listen for keyspace notifications of redis cluster in java using jedis client?


I am setting a server which can listen and send message about any event occur in the redis database. I am successful in getting notified about new events for redis host and port but not able to do so for redis cluster.

GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(30);
config.setMaxWaitMillis(2000);

Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7001));

JedisCluster cluster1 = new JedisCluster(jedisClusterNode, config);

String redisProperties = cluster1.getClusterNodes().toString().replaceAll("[{}]", "");
Set<HostAndPort> nodes = new HashSet<>();
String[] mainArray = redisProperties.split(",");
for (int i = 0; i < mainArray.length; i++) {
    String[] equalArray = mainArray[i].split("=");
    String mainData = equalArray[0];
    String[] ipPortPair = mainData.split(":");
    nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}

JedisCluster cluster = new JedisCluster(nodes, 10000, 1000, 1, config);
jedis.configSet("notify-keyspace-events", "AKE"); // For all kind of events
jedis.psubscribe(new KeyListenerCluster(), "__keyevent@0__:*");

I am able to perform every other operation in using the redis-cluster but not able to do one thing.

cluster.configSet("notify-keyspace-events", "AKE"); // For all kind of events
cluster.psubscribe(new KeyListenerCluster(), "__keyevent@0__:*");

Solution

  • I am facing the same issue, Keyspace notification is working intermittently (works 6 or 7 times out of 10). I read that one needs to subscribe to all of the master nodes in order to get the notifications. If it helps I have pasted my config file below :

    KeySpaceNotificationMessageListener keySpaceNotificationMessageListener;
    
        @Value("${spring.redis.cluster.nodes}")
        private String hostsAndPorts;
    
        @Bean
        JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration redisClusterConfiguration)
        {
            return new JedisConnectionFactory(redisClusterConfiguration);
        }
    
        @Bean
        MessageListenerAdapter messageListener() {
            return new MessageListenerAdapter(keySpaceNotificationMessageListener);
        }
    
    
        @Bean(name = "cacheManager1")
        @Primary
        public RedisCacheManager redisCacheManager1()
        {
            RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                    .disableCachingNullValues()
                    .entryTtl(Duration.ofMinutes(1))
                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.json()));
    
            redisCacheConfiguration.usePrefix();
    
            return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(jedisConnectionFactory(redisClusterConfiguration()))
                    .cacheDefaults(redisCacheConfiguration).build();
    
        }
    
        @Bean(name = "cacheManager2")
        public RedisCacheManager redisCacheManager2( JedisConnectionFactory jedisConnectionFactory)
        {
            RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                    .disableCachingNullValues()
                    .entryTtl(Duration.ofDays(1))
                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.json()));
    
            redisCacheConfiguration.usePrefix();
    
            return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(jedisConnectionFactory(redisClusterConfiguration()))
                    .cacheDefaults(redisCacheConfiguration).build();
    
        }
    
        @Bean
        public RedisClusterConfiguration redisClusterConfiguration()
        {
            String [] redisHostAndPorts = hostsAndPorts.split(",");
    
            System.out.println(Arrays.toString(redisHostAndPorts));
            RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(Arrays.asList(redisHostAndPorts));
            return redisClusterConfiguration;
        }
    
        @Bean
        RedisMessageListenerContainer redisContainer() {
            final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(jedisConnectionFactory(redisClusterConfiguration()));
            container.addMessageListener(messageListener(), new PatternTopic("__keyspace@*:*"));
            container.setTaskExecutor(Executors.newFixedThreadPool(4));
            return container;
        }
    
        public void setKeySpaceNotificationMessageListener(KeySpaceNotificationMessageListener keySpaceNotificationMessageListener)
        {
            this.keySpaceNotificationMessageListener = keySpaceNotificationMessageListener;
        }