.netredisparallel-processingstackexchange.redispipelining

Is this correct implementation of pipelining in StackExchange.Redis in .NET?


I'm trying to increase performance of reading hashes from Redis and I've tried to implement pipelining, is this the correct way to do this?

public Task FetchHashesFromRedis(List<string> redisKeys, CancellationToken cancellationToken) 
{
      var parallel = Environment.ProcessorCount;
      var semafore = new SemaphoreSlim(initialCount: parallel, maxCount: parallel);
      var tasks = new List<Task>();

      for (int i = 0; i < redisKeys.Count; i++)
      {
          var currentKey = redisKeys.ElementAt(i);
          var task = FetchFromRedis(currentKey, semafore, cancellationToken)
          tasks.Add(task);
      }

      return Task.WhenAll(tasks);
}

What could be improved in this code?


Solution

  • StackExchange.Redis will automatically pipeline everything for you if you use the async API and an array of tasks. You will also want to make sure in the case of Hashes that you take advantage of the variadicity of the HSET/HMGET command to decrease the number of raw commands you need to issue to Redis.

    I'm not sure what FetchFromRedis does in the context of the code you provided. Assuming it executes one of the HGET/HMGET/HGETALL operations asynchronously, and returns the task from that, I doubt you're too far off. But I'm not seeing how you are pulling the results out yet?

    The following code creates and gets the data from 10k Hashes in Redis. It's a tiny bit trivial but demonstrates the behavior you are looking for. On my machine (granted, I'm going to localhost), this executes in about 75ms - or around 3.8 μs / operation. When this completes, the results are all available in the tasks of getTasks.

    You will want to make sure (with a realistic data-load) that you are not going to be pushing this too heavily though, as smashing StackExchange.Redis can cause it to throw off timeout exceptions (basically if you queue up too much stuff operations are liable to get stuck in the multiplexer's message-queue and fail).

    var stopwatch = Stopwatch.StartNew();
    
    var setTasks = new List<Task>();
    
    for (var i = 1; i < 10000; i++)
    {
        setTasks.Add(db.HashSetAsync($"hash:{i}", new HashEntry[]{new HashEntry("foo", "bar"), new HashEntry("baz", 3)}));
    }
    
    await Task.WhenAll(setTasks);
    
    var getTasks = new List<Task<RedisValue[]>>(); 
    
    for (var i = 1; i < 10000; i++)
    {
        getTasks.Add(db.HashGetAsync($"hash:{i}", new RedisValue[]{"foo","baz"}));
    }
    
    await Task.WhenAll(getTasks);
    
    stopwatch.Stop();
    
    Console.Write($"total execution time:{stopwatch.ElapsedMilliseconds}");