azure-functionssignalr.net-8.0azure-functions-isolated

How to send SignalR multiple async messages from one function .Net 8


I'm currently migrating my azure functions from .NET 6 to 8 in an isolated worker model. Below is an example of how my real code works. basically the signalR is used to send messages to the front end on the progress of a long running queue function which no longer works on .NET 8 because I can only return one single message as a return SignalRMessageAction. is there a way to send more than one messages in a sequential order? either through separate queues or call another signalR message function?


        [Function("PerformBackgroundJob")]
        [SignalROutput(HubName = "progress", ConnectionStringSetting = "AzureSignalRConnectionString")]
        public SignalRMessageAction PerformBackgroundJob(
            [QueueTrigger(LibraryConstants.testQueue, Connection = "connectionstring-azure-storage")] string JobId)
        {
            var msg = new SignalRMsg();
            
            _logger.LogInformation("PerformBackgroundJob started");
            Stopwatch sw1 = new Stopwatch();
            sw1.Start();
            
            List<ProcessFileResponse> response = new List<ProcessFileResponse>();

            msg = new SignalRMsg()
            {
                UserId = JobId,
                Target = "taskStarted",
                Arguments = new object[] { "PerformBackgroundJob started" }
            };
            //Send a message here
            //SendMessage(msg);
            _logger.LogInformation("PerformBackgroundJob data reviewed");
            
            for (int i = 0; i < 100; i++)
            {
                msg = new SignalRMsg()
                {
                    UserId = JobId,
                    Target = "taskProgressChanged",
                    Arguments = new object[] { i + 1 }
                };
                //Send a message here
                //SendMessage(msg);
                Thread.Sleep(200);
            }
            sw1.Stop();
            TimeSpan ts = sw1.Elapsed;

            // Format and display the TimeSpan value.
            string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
                ts.Hours, ts.Minutes, ts.Seconds,
                ts.Milliseconds / 10);
            ProcessFileResponse outResponse = new ProcessFileResponse();
            outResponse.Errors = 0;
            outResponse.Warnings = 0;
            outResponse.Status = 1;
            outResponse.TabName = "Background action";
            outResponse.Message = $"RunTime: {elapsedTime}";
            response.Add(outResponse);
            
            _logger.LogInformation("PerformBackgroundJob completed");
            //this is The only message that is visible
            return new SignalRMessageAction("taskEnded")
            {
                Arguments = new object[] { response },
                UserId = JobId
            };
        }

I tried to use a separate function to send messages but it doesn't work, or maybe I'm calling the function in an incorrect way.

[Function(nameof(SendMessage))]
[SignalROutput(HubName = "progress", ConnectionStringSetting = "AzureSignalRConnectionString")]
public static SignalRMessageAction SendMessage(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post")] SignalRMsg req)
{
    return new SignalRMessageAction(req.Target)
    {
        Arguments = req.Arguments,
        UserId = req.UserId
    };
}

public class SignalRMsg
{
    public string UserId { get; set; }
    public string Target { get; set; }
    public object[] Arguments { get; set; }
}

Solution

  • Ok so the answer to my issue was to use the QueueServiceClient. first within Program.cs I added the following to the servcie configuration:

    services.AddAzureClients(builder => {
        builder.AddQueueServiceClient(azStorage).ConfigureOptions(options =>
        options.MessageEncoding = Azure.Storage.Queues.QueueMessageEncoding.Base64);
    });
    

    One important note is that the MessageEncoding option is important to send messages with azure functions. Afterwards I added a new signalR message function that is triggered with a QueueTrigger:

    [Function(nameof(SendMessageQueue))]
    [SignalROutput(HubName = "progress", ConnectionStringSetting = "AzureSignalRConnectionString")]
    public static SignalRMessageAction SendMessageQueue(
        [QueueTrigger("messagequeue", Connection = "connectionstring-azure-storage")] string messageQueue)
    {
        var message = JsonSerializer.Deserialize<SignalRMsg>(messageQueue);
        return new SignalRMessageAction(message.Target)
        {
            Arguments = message.Arguments,
            UserId = message.UserId
        };
    }
    

    For easy of use I also created a small message class to pass the UserId, Target and Arguments directly:

    public class SignalRMsg
    {
        public string UserId { get; set; }
        public string Target { get; set; }
        
        public object[] Arguments { get; set; }
        public SignalRMsg() { }
    
    }
    

    After this I changed the PerformBackgroundJob function to accommodate my changes and I was finally able to send multiple messages sending them through a queue. First by adding the queue service into the my existing class:

    private readonly ILogger<BackgroundJobTest> _Logger;
    private readonly QueueClient _QueueClient;
    private readonly QueueServiceClient _QueueServiceClient;
    
    public BackgroundJobTest(ILogger<BackgroundJobTest> Logger, QueueServiceClient QueueServiceClient, IServiceScopeFactory serviceProvider, ITypeCurveNeighborhoodService typeCurveNeighborhoodService)
    {
        _Logger = Logger;
        _QueueClient = QueueServiceClient.GetQueueClient("messagequeue");
        _QueueServiceClient = QueueServiceClient;
    }
    

    Then I simply added and filled the new message class I created before, serialized it and then sent it as a message into the queue:

    msg = new SignalRMsg()
     {
         UserId = JobId,
         Target = "taskStarted",
         Arguments = new object[] { "PerformBackgroundJob started" }
     };
     //Send a message here
     var jsonMsg = JsonSerializer.Serialize<SignalRMsg>(msg);
     await _QueueClient.SendMessageAsync(jsonMsg);
     _Logger.LogInformation("PerformBackgroundJob data reviewed");
    

    One last part is that we must check if the queue exists in your blob storage account otherwise the messages will fail. for this I created a small function that checks and creates a queue if it doesn't exist:

    private void CheckExisitngQueue(string messageQueue)
    {
       var queueClient = _QueueServiceClient.GetQueueClient(LibraryConstants.messageQueue);
        queueClient.CreateIfNotExists();
    
    }
    

    Here is the full functions for example purposes:

    [Function("PerformBackgroundJob")]
    public async Task PerformBackgroundJob(
        [QueueTrigger("testQueue", Connection = "connectionstring-azure-storage")] string JobId)
    {
        
        var msg = new SignalRMsg();
        
            _Logger.LogInformation("PerformBackgroundJob started");
            Stopwatch sw1 = new Stopwatch();
            sw1.Start();
            CheckExisitngQueue("messagequeue");
            List<ProcessFileResponse> response = new List<ProcessFileResponse>();
    
            msg = new SignalRMsg()
            {
                UserId = JobId,
                Target = "taskStarted",
                Arguments = new object[] { "PerformBackgroundJob started" }
            };
            //Send a message here
            var jsonMsg = JsonSerializer.Serialize<SignalRMsg>(msg);
            await _QueueClient.SendMessageAsync(jsonMsg);
            _Logger.LogInformation("PerformBackgroundJob data reviewed");
        
            for (int i = 0; i < 100; i++)
            {
                msg = new SignalRMsg()
                {
                    UserId = JobId,
                    Target = "taskProgressChanged",
                    Arguments = new object[] { i + 1 } 
                };
                //Send a message here
                jsonMsg = JsonSerializer.Serialize<SignalRMsg>(msg);
                await _QueueClient.SendMessageAsync(jsonMsg);
                Thread.Sleep(200);
            }
            sw1.Stop();
            TimeSpan ts = sw1.Elapsed;
    
            // Format and display the TimeSpan value.
            string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
                ts.Hours, ts.Minutes, ts.Seconds,
                ts.Milliseconds / 10);
            ProcessFileResponse outResponse = new ProcessFileResponse();
            outResponse.Errors = 0;
            outResponse.Warnings = 0;
            outResponse.Status = 1;
            outResponse.TabName = "Background action";
            outResponse.Message = $"RunTime: {elapsedTime}";
            response.Add(outResponse);
    
            _Logger.LogInformation("PerformBackgroundJob completed");
            
             msg = new SignalRMsg()
            {
                Target = "taskEnded"
                Arguments = new object[] { response },
                UserId = JobId
            };                
            jsonMsg = JsonSerializer.Serialize<SignalRMsg>(msg);
            await _QueueClient.SendMessageAsync(jsonMsg);
    }