asp.net.netredisredis-streams

how to work with XREAD .NET Redis to read actual changes


When i am set stop point at line with XREAD programm doing nothing. Maybe i need to configure this XREAD command?

public async void ListenTask()
        {
        var readTask = Task.Run(async () =>
        {
            while (!Token.IsCancellationRequested)
            {
                var result = db.StreamRead(streamName, "$", 1);

                if (result.Any())
                {
                    var dict = ParseResult(result.Last());
                    var sb = new StringBuilder();
                    foreach (var key in dict.Keys)
                    {
                        sb.Append(dict[key]);
                    }

                    Console.WriteLine(sb.ToString());

                }

                await Task.Delay(1000);
            }
        });
    }

Solution

  • It's an illegal operation on StackExchange.Redis

    Because of its unique multiplexed architecture, StackExchange.Redis (the library you appear to be using) does not support blocking XREAD operations. This is because all the commands going over the interactive interface (basically everything non-pub/sub) uses the same connection. If you block one of those connections, everything else in your app dependent on the multiplexer will be backed up awaiting the block to complete. The StackExchange.Redis library actually goes so far as to consider the $ id an illegal id, it's only purpose after all is to block. What's most likely happening (you don't see it happen because it's being swallowed up by the synchronization context) is that var result = db.StreamRead(streamName, "$", 1); is throwing an InvalidOperationException: System.InvalidOperationException: StreamPosition.NewMessages cannot be used with StreamRead.

    Work Arounds

    There are 2 potential workarounds in this case, first, you can use poll with XRANGE command rather than using blocking reads.

    var readTask = Task.Run(async () =>
    {
        var lastId = "-";
        while (!token.IsCancellationRequested)
        {
            var result = await db.StreamRangeAsync("a-stream", lastId, "+");
            if(result.Any()
            {
                lastId = result.Last().Id;            
            }
            await Task.Delay(1000);
        }
    });
    

    You're already effectively doing a thread sleep so this polling operation is probably good enough for what you're looking for.

    If you really need to do blocking operations, you'll have to use a different library, if you try to use StackExchange.Redis (you can possibly force the issue with the Execute/ExecuteAsync commands) you can seriously negatively degrade its performance.

    Articles on doing so with ServiceStack.Redis and CsRedis are available on the redis developer site (I'm the author of them)

    One final thing

    Probably want to make sure that when you are issuing these commands that you are being as async as possible, you're using the sync XREAD command in an Async context (mostly every command in StackExchange.Redis has a sync/async version you can use - use the async when possible).