I want to send a scheduled message with Masstransit but I couldn't. I am using it for the first time and I think I missed something in the configuration. Can you help me with the point I missed? The queue is formed as seen in the picture. If I want to send an unscheduled direct message, I can send it without any problems.
public static class BusExtensions
{
public static void AddBusExt(this IServiceCollection services, IConfiguration configuration)
{
var serviceBusOptions = configuration.GetSection(nameof(ServiceBusOption)).Get<ServiceBusOption>();
services.AddScoped<IServiceBus, ServiceBus>();
services.AddMassTransit(x =>
{
x.AddConsumer<ProjectAddedEventConsumer>();
x.AddDelayedMessageScheduler();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(serviceBusOptions!.Url), h => { });
cfg.UseDelayedMessageScheduler();
cfg.ReceiveEndpoint(ServiceBusConst.ProjectAddedEventQueueName, e =>
{
e.ConfigureConsumer<ProjectAddedEventConsumer>(context);
});
});
});
}
}
ScheduleSend
public async Task ScheduleSendAsync<T>(T message, string queueName, DateTime scheduleTime, CancellationToken cancellation = default) where T : class, IEventOrMessage
{
var endpointUri = new Uri($"queue:{queueName}");
await messageScheduler.ScheduleSend(endpointUri, scheduleTime, message, cancellation);
}
Send Message
public async Task<ServiceResult<AddProjectResponse>> AddAsync(AddProjectRequest request)
{
var project = mapper.Map<Project>(request);
await projectRepository.AddAsync(project);
await unitOfWork.SaveChangesAsync();
await busService.ScheduleSendAsync(new ProjectAddedEvent(project.Id, project.Name, project.Description, project.OwnerId, project.ProjectManagerId, project.Link, project.Status, project.Icon, project.Color, project.Tags, project.StartDate, project.EndDate), ServiceBusConst.ProjectAddedEventQueueName, DateTime.UtcNow.AddSeconds(10));
return ServiceResult<AddProjectResponse>.SuccessAsCreated(new AddProjectResponse(project.Id), $"api/projects/{project.Id}");
}
Const Class
public class ServiceBusConst
{
public const string ProjectAddedEventQueueName = "project-added-event-queue";
}
There were two main reasons for the problem. 1. Reason
When I manually received, my bindings were failing. I added the endpoints directly in the context as shown in the documentation. cfg.ConfigureEndpoints(context);
2. Reason
There were old exchanges with the same name and queues connected to them. During the tests, it was not enough to just delete the queue. There are exchanges that the queue is connected to. Since the same exchange already exists, it is not recreated. Since the exchange was not recreated, it could not reconnect while connecting the relevant queue. Clearing all exchanges and running the tests from scratch solved my problem.
Edit:
After cleaning everything, I tested again. I added a queue with a special name as below. It worked again without any problems. The main problem is that if the queues are not deleted in the relevant exchange after deleting them, no error is given during sending, but the sending process does not work either.
cfg.ReceiveEndpoint(ServiceBusConst.ProjectAddedEventQueueName, e =>
{
e.ConfigureConsumer<ProjectAddedEventConsumer>(context);
});