azureazureservicebusazure-servicebus-topicsazure-servicebus-subscriptions

Handle event after SessionIdleTimeout passed on Azure service bus topic subscription


I'm using Azure service bus topic with C# and ASP.NET Core. I have created session enabled subscription and I'm using ServiceBusSessionProcessor to receive messages from this topic.

I would like to set SessionIdleTimeout to 15 seconds and handle an event after this time has passed. If there is no message received I would like to update a record in my database and stop processing.

Is this possible? Is there anyway I can know that there was no message received in those 15 seconds?

Some of the code:

public BaseSessionMessageProcessor(
        IMessageHandler<T> handler,
        string topicName,
        string subscriptionName,
        string sessionId,
        IServiceBusPersisterConnection serviceBusPersisterConnection,
        IMessageHelper messageHelper,
        IAppLogger<T> logger)
    {
        _handler = handler;
        _messageHelper = messageHelper;
        _logger = logger;

        var options = new ServiceBusSessionProcessorOptions
        {
            MaxConcurrentSessions = 2,
            MaxConcurrentCallsPerSession = 1,
            AutoCompleteMessages = false,
            SessionIds = { sessionId },
            SessionIdleTimeout = TimeSpan.FromSeconds(15)
        };
        _processor = serviceBusPersisterConnection.ServiceBusClient.CreateSessionProcessor(topicName, subscriptionName, options);

        RegisterSessionMessageProcessor().GetAwaiter().GetResult();
    }

    public async Task RegisterSessionMessageProcessor()
    {
        _processor.ProcessMessageAsync += SessionMessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;

        await _processor.StartProcessingAsync();
    }

    public async Task SessionMessageHandler(ProcessSessionMessageEventArgs args)
    {
        _logger.LogInformation($"log-info: process started");
    }

    public Task ErrorHandler(ProcessErrorEventArgs args)
    {
        var exception = args.Exception;
        var context = args.ErrorSource;

        _logger.LogError($"log-exception: An error occurred while trying to handle a message.", exception, context);

        return Task.CompletedTask;
    }

Solution

  • The SessionIdleTimeout property in the ServiceBusSessionProcessorOptions class here.

    • Gets or sets the maximum amount of time to wait for a message to be received for the currently active session.
    • After this time has elapsed, the processor will close the session and attempt to process another session.
    • If SessionIds is populated and MaxConcurrentSessions is greater or equal to the number of sessions specified in SessionIds, the session will not be closed when the idle timeout elapses.
        // Create a session processor with SessionIdleTimeout
        var options = new ServiceBusSessionProcessorOptions
        {
            SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
        };
    
        var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
    
        // Configure the message handler
        processor.ProcessMessageAsync += MessageHandler;
        processor.ProcessErrorAsync += ErrorHandler;
    
        // Start processing sessions
        await processor.StartProcessingAsync();
    
        Console.WriteLine("Press any key to stop processing...");
        Console.ReadKey();
    
        // Stop processing sessions
        await processor.StopProcessingAsync();
    }
    
    static async Task MessageHandler(ProcessSessionMessageEventArgs args)
    {
        // Process the received message
        var message = args.Message;
        Console.WriteLine($"Received message: {message.Body} from session: {args.SessionId}");
    
        // Complete the message to remove it from the session
        await args.CompleteMessageAsync(message);
    }
    
    static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        // Handle errors
        Console.WriteLine($"Error: {args.Exception.Message}");
        return Task.CompletedTask;
    }
    
    
    
    

    Output:

    enter image description here

    Listen for a specific session with processing:

    static async Task CreateSubscriptionWithSessionAsync(string connectionString, string topicName, string subscriptionName)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
    
        if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
        {
            var options = new CreateSubscriptionOptions(topicName, subscriptionName)
            {
                RequiresSession = true // Enable session support
            };
            await client.CreateSubscriptionAsync(options);
        }
    }
    
    static async Task ProcessMessagesWithSessionAsync(string connectionString, string topicName, string subscriptionName, string sessionId)
    {
        var client = new ServiceBusClient(connectionString);
    
        var options = new ServiceBusSessionProcessorOptions
        {
            SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
        };
    
        var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
        processor.ProcessMessageAsync += (args) => MessageHandler(args, sessionId);
        processor.ProcessErrorAsync += ErrorHandler;
    
        Console.WriteLine($"Start processing messages from session '{sessionId}' in subscription '{subscriptionName}'...");
    
        await processor.StartProcessingAsync();
    
        Console.WriteLine("Press any key to stop processing...");
        Console.ReadKey();
    
        await processor.StopProcessingAsync();
        await processor.CloseAsync();
        await client.DisposeAsync();
    }
    
    static async Task MessageHandler(ProcessSessionMessageEventArgs args, string expectedSessionId)
    {
        if (args.SessionId == expectedSessionId)
        {
            var message = args.Message;
            Console.WriteLine($"Received message from session '{args.SessionId}': {Encoding.UTF8.GetString(message.Body)}");
            await args.CompleteMessageAsync(message);
        }
    }
    
    static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Error: {args.Exception.Message}");
        return Task.CompletedTask;
    }
    
    
    

    enter image description here

        static async Task CreateSubscriptionWithSessionAsync(string connectionString, string topicName, string subscriptionName)
        {
            var client = new ServiceBusAdministrationClient(connectionString);
    
            if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
            {
                var options = new CreateSubscriptionOptions(topicName, subscriptionName)
                {
                    RequiresSession = true // Enable session support
                };
                await client.CreateSubscriptionAsync(options);
            }
        }
    
        static async Task ProcessMessagesWithSessionAsync(string connectionString, string topicName, string subscriptionName, string sessionId)
        {
            var client = new ServiceBusClient(connectionString);
    
            var options = new ServiceBusSessionProcessorOptions
            {
                SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
            };
    
            // Change the Timer initialization
            sessionIdleTimer = new Timer(state => StopProcessingIfIdle(), null, Timeout.Infinite, Timeout.Infinite);
    
            var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
            processor.ProcessMessageAsync += (args) => MessageHandler(args, sessionId);
            processor.ProcessErrorAsync += ErrorHandler;
    
            Console.WriteLine($"Start processing messages from session '{sessionId}' in subscription '{subscriptionName}'...");
    
            await processor.StartProcessingAsync();
    
            Console.WriteLine("Press any key to stop processing...");
            Console.ReadKey();
    
            await processor.StopProcessingAsync();
            await processor.CloseAsync();
            await client.DisposeAsync();
        }
    
        static async Task MessageHandler(ProcessSessionMessageEventArgs args, string expectedSessionId)
        {
            if (args.SessionId == expectedSessionId)
            {
                var message = args.Message;
                Console.WriteLine($"Received message from session '{args.SessionId}': {Encoding.UTF8.GetString(message.Body)}");
    
                // Reset the timer upon receiving a message
                sessionIdleTimer.Change(TimeSpan.FromMinutes(5), Timeout.InfiniteTimeSpan);
    
                await args.CompleteMessageAsync(message);
            }
        }
    
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine($"Error: {args.Exception.Message}");
            return Task.CompletedTask;
        }
    
        static void StopProcessingIfIdle()
        {
            // Stop processing if the session is idle for the specified timeout
            Console.WriteLine($"Session is idle. Stopping processing...");
            sessionIdleTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // Stop the processing here (you may need to set a flag or trigger another mechanism)
            // For example: Environment.Exit(0);
        }
    }
    
    

    enter image description here