I am using .NET Isolated Functions
I need to send multiple messages to a queue
So I am using the code below
[Function("process-duplicates")]
public async Task<DispatchedMessages> ProcessDataAsync([Microsoft.Azure.Functions.Worker.HttpTrigger(AuthorizationLevel.Anonymous,
"post", Route = "myroute")]
HttpRequest req,
IAsyncCollector<string> asyncCollector)
{
await _myService.ProcessDataAsync();
return new DispatchedMessages
{
Messages = _myService.MessagesToBeSent.Select(x => x.ToJson()),
Status = new OkObjectResult("Processed Data")
};
}
Where DispatchedMessages is
public class DispatchedMessages
{
[JsonIgnore]
[ServiceBusOutput("QueueName", Connection = "event-bus-connection")]
public IEnumerable<string> Messages { get; set; }
public IActionResult Status { get; set; }
}
This works fine for a normal queue
However, I now need to get this work on a session based queue so I need to provide a value for the sessionId
var messages = new List<ServiceBusMessage>();
_myService.MessagesToBeSent.ForEach(x =>
{
var message = new ServiceBusMessage(x.ToJson())
{
SessionId = "Test"
};
messages.Add(message);
});
var result = new DispatchedMessages
{
Messages = messages.Select(x => x.ToJson()),
Status = new OkObjectResult("Processed Data")
};
But this doesnt work
How can I do this?
Paul
How to use SessionId with ServiceBusOutput on Worker Function azure
To send messages to service bus with Session Id , you have to use Azure SDK's for it .
Function.cs:
using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace RithFunctionApp
{
public class Function1
{
private readonly ILogger<Function1> ri_lg;
private readonly ServiceBusClient ri_sbc;
public Function1(ILogger<Function1> ri, ServiceBusClient sbc)
{
ri_lg = ri;
ri_sbc = sbc;
}
[Function("Function1")]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post")] HttpRequest req)
{
ri_lg.LogInformation("Hello Rithwik Bojja, Function is sending messages");
var msg_with_ssId = MessageAndSession();
await SendMessageToServiceBus(msg_with_ssId);
return new OkObjectResult("Hello Rithwik Bojja, Sent Message with id, please check");
}
private List<ListTest> MessageAndSession()
{
return new List<ListTest>
{
new ListTest { Msg = "Rithwik", SessId = "Sess-1" },
new ListTest { Msg = "Bojja", SessId = "Sess-2" },
};
}
private async Task SendMessageToServiceBus(List<ListTest> msgs)
{
ServiceBusSender sender = ri_sbc.CreateSender("rithq1");
foreach (var rith in msgs)
{
var ri_msg = new ServiceBusMessage(rith.Msg)
{
SessionId = rith.SessId
};
await sender.SendMessageAsync(ri_msg);
}
}
}
public class ListTest
{
public string Msg { get; set; }
public string SessId { get; set; }
}
}
Here rithq1
is queue name.
Program.cs:
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
var ri = new HostBuilder()
.ConfigureFunctionsWebApplication()
.ConfigureServices(rith =>
{
rith.AddSingleton((s) =>
{
string rics = Environment.GetEnvironmentVariable("test");
return new ServiceBusClient(rics);
});
})
.Build();
ri.Run();
Here test
is connection string stored in local.settings.json.
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"test": "Endpoint=sb://rithtest.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=6YXOZPAvhY="
}
}
Output: