How can I use an Elasticache Redis Replication Group as a data sink in Flink for Kinesis Analytics?
I have created an Elasticache Redis Replication Group, and would like to compute something in Flink and store the results in this group.
My Java code,
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.net.InetSocketAddress;
import java.util.Set;
...
var endpoint = "foo.bar.clustercfg.usw2.cache.amazonaws.com";
var port = 6379;
var node = new InetSocketAddress(endpoint, port);
var jedisConfig = new FlinkJedisClusterConfig.Builder().setNodes(Set.of(node))
.build();
var redisMapper = new MyRedisMapper();
var redisSink = new RedisSink<>(jedisConfig, redisMapper);
This gives me the following error:
java.lang.NumberFormatException: For input string: "6379@1122"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.valueOf(Integer.java:983)
at redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39)
at redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14)
at redis.clients.jedis.JedisClusterInfoCache.discoverClusterNodesAndSlots(JedisClusterInfoCache.java:50)
at redis.clients.jedis.JedisClusterConnectionHandler.initializeSlotsCache(JedisClusterConnectionHandler.java:39)
at redis.clients.jedis.JedisClusterConnectionHandler.<init>(JedisClusterConnectionHandler.java:28)
at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:21)
at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:16)
at redis.clients.jedis.BinaryJedisCluster.<init>(BinaryJedisCluster.java:39)
at redis.clients.jedis.JedisCluster.<init>(JedisCluster.java:45)
This occurs while trying to parse the response of CLUSTER NODES. The ip:port@cport
is expected as part of the response (see https://redis.io/commands/cluster-nodes/) but Jedis is unable to parse this.
Am I doing something wrong here, or is this a bug in Jedis?
After a little digging I found that this is a bug which affects Jedis 2.8 and earlier when using Redis 4.0 or later. https://github.com/redis/jedis/issues/1958
My Redis cluster is running 6.2.6, and my Apache Flink is 1.13, which is old but is the newest version currently supported by AWS.
To solve this issue, I had to upgrade Jedis to the latest 2.x version so that this bug was fixed but it was still compatible with the Flink 1.13 libraries. Upgrading Jedis to a 3.x or 4.x version broke Flink.
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.10.2</version>
</dependency>