I'm using Azure service bus topic with C# and ASP.NET Core. I have created session enabled subscription and I'm using ServiceBusSessionProcessor to receive messages from this topic.
I would like to set SessionIdleTimeout to 15 seconds and handle an event after this time has passed. If there is no message received I would like to update a record in my database and stop processing.
Is this possible? Is there anyway I can know that there was no message received in those 15 seconds?
Some of the code:
public BaseSessionMessageProcessor(
IMessageHandler<T> handler,
string topicName,
string subscriptionName,
string sessionId,
IServiceBusPersisterConnection serviceBusPersisterConnection,
IMessageHelper messageHelper,
IAppLogger<T> logger)
{
_handler = handler;
_messageHelper = messageHelper;
_logger = logger;
var options = new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 2,
MaxConcurrentCallsPerSession = 1,
AutoCompleteMessages = false,
SessionIds = { sessionId },
SessionIdleTimeout = TimeSpan.FromSeconds(15)
};
_processor = serviceBusPersisterConnection.ServiceBusClient.CreateSessionProcessor(topicName, subscriptionName, options);
RegisterSessionMessageProcessor().GetAwaiter().GetResult();
}
public async Task RegisterSessionMessageProcessor()
{
_processor.ProcessMessageAsync += SessionMessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync();
}
public async Task SessionMessageHandler(ProcessSessionMessageEventArgs args)
{
_logger.LogInformation($"log-info: process started");
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
var exception = args.Exception;
var context = args.ErrorSource;
_logger.LogError($"log-exception: An error occurred while trying to handle a message.", exception, context);
return Task.CompletedTask;
}
The
SessionIdleTimeout
property in theServiceBusSessionProcessorOptions
class here.
- Gets or sets the maximum amount of time to wait for a message to be received for the currently active session.
- After this time has elapsed, the processor will close the session and attempt to process another session.
- If
SessionIds
is populated andMaxConcurrentSessions
is greater or equal to the number of sessions specified inSessionIds
, the session will not be closed when the idle timeout elapses.
ServiceBusSessionProcessor
with the specified SessionIdleTimeout
property. It defines the maximum amount of time the processor will wait for a message to be received for the currently active session. If no message is received within this time frame, the processor will close the session and attempt to process another session. // Create a session processor with SessionIdleTimeout
var options = new ServiceBusSessionProcessorOptions
{
SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
};
var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
// Configure the message handler
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
// Start processing sessions
await processor.StartProcessingAsync();
Console.WriteLine("Press any key to stop processing...");
Console.ReadKey();
// Stop processing sessions
await processor.StopProcessingAsync();
}
static async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
// Process the received message
var message = args.Message;
Console.WriteLine($"Received message: {message.Body} from session: {args.SessionId}");
// Complete the message to remove it from the session
await args.CompleteMessageAsync(message);
}
static Task ErrorHandler(ProcessErrorEventArgs args)
{
// Handle errors
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
}
Output:
Listen for a specific session with processing:
static async Task CreateSubscriptionWithSessionAsync(string connectionString, string topicName, string subscriptionName)
{
var client = new ServiceBusAdministrationClient(connectionString);
if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
{
var options = new CreateSubscriptionOptions(topicName, subscriptionName)
{
RequiresSession = true // Enable session support
};
await client.CreateSubscriptionAsync(options);
}
}
static async Task ProcessMessagesWithSessionAsync(string connectionString, string topicName, string subscriptionName, string sessionId)
{
var client = new ServiceBusClient(connectionString);
var options = new ServiceBusSessionProcessorOptions
{
SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
};
var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
processor.ProcessMessageAsync += (args) => MessageHandler(args, sessionId);
processor.ProcessErrorAsync += ErrorHandler;
Console.WriteLine($"Start processing messages from session '{sessionId}' in subscription '{subscriptionName}'...");
await processor.StartProcessingAsync();
Console.WriteLine("Press any key to stop processing...");
Console.ReadKey();
await processor.StopProcessingAsync();
await processor.CloseAsync();
await client.DisposeAsync();
}
static async Task MessageHandler(ProcessSessionMessageEventArgs args, string expectedSessionId)
{
if (args.SessionId == expectedSessionId)
{
var message = args.Message;
Console.WriteLine($"Received message from session '{args.SessionId}': {Encoding.UTF8.GetString(message.Body)}");
await args.CompleteMessageAsync(message);
}
}
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
}
Enable sessions
when creating a subscription and add various filters with properties. static async Task CreateSubscriptionWithSessionAsync(string connectionString, string topicName, string subscriptionName)
{
var client = new ServiceBusAdministrationClient(connectionString);
if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
{
var options = new CreateSubscriptionOptions(topicName, subscriptionName)
{
RequiresSession = true // Enable session support
};
await client.CreateSubscriptionAsync(options);
}
}
static async Task ProcessMessagesWithSessionAsync(string connectionString, string topicName, string subscriptionName, string sessionId)
{
var client = new ServiceBusClient(connectionString);
var options = new ServiceBusSessionProcessorOptions
{
SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
};
// Change the Timer initialization
sessionIdleTimer = new Timer(state => StopProcessingIfIdle(), null, Timeout.Infinite, Timeout.Infinite);
var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
processor.ProcessMessageAsync += (args) => MessageHandler(args, sessionId);
processor.ProcessErrorAsync += ErrorHandler;
Console.WriteLine($"Start processing messages from session '{sessionId}' in subscription '{subscriptionName}'...");
await processor.StartProcessingAsync();
Console.WriteLine("Press any key to stop processing...");
Console.ReadKey();
await processor.StopProcessingAsync();
await processor.CloseAsync();
await client.DisposeAsync();
}
static async Task MessageHandler(ProcessSessionMessageEventArgs args, string expectedSessionId)
{
if (args.SessionId == expectedSessionId)
{
var message = args.Message;
Console.WriteLine($"Received message from session '{args.SessionId}': {Encoding.UTF8.GetString(message.Body)}");
// Reset the timer upon receiving a message
sessionIdleTimer.Change(TimeSpan.FromMinutes(5), Timeout.InfiniteTimeSpan);
await args.CompleteMessageAsync(message);
}
}
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
}
static void StopProcessingIfIdle()
{
// Stop processing if the session is idle for the specified timeout
Console.WriteLine($"Session is idle. Stopping processing...");
sessionIdleTimer.Change(Timeout.Infinite, Timeout.Infinite);
// Stop the processing here (you may need to set a flag or trigger another mechanism)
// For example: Environment.Exit(0);
}
}