asp.net-corerabbitmqmasstransit

How to send timed messages with Masstransit?


I want to send a scheduled message with Masstransit but I couldn't. I am using it for the first time and I think I missed something in the configuration. Can you help me with the point I missed? The queue is formed as seen in the picture. If I want to send an unscheduled direct message, I can send it without any problems.

enter image description here Configration

     public static class BusExtensions
{
    public static void AddBusExt(this IServiceCollection services, IConfiguration configuration)
    {
        var serviceBusOptions = configuration.GetSection(nameof(ServiceBusOption)).Get<ServiceBusOption>();

        services.AddScoped<IServiceBus, ServiceBus>();

        services.AddMassTransit(x =>
        {
            x.AddConsumer<ProjectAddedEventConsumer>();

            x.AddDelayedMessageScheduler();

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(new Uri(serviceBusOptions!.Url), h => { });

                cfg.UseDelayedMessageScheduler();

                cfg.ReceiveEndpoint(ServiceBusConst.ProjectAddedEventQueueName, e =>
                {
                    e.ConfigureConsumer<ProjectAddedEventConsumer>(context);
                });
            });
        });
    }

}

ScheduleSend

public async Task ScheduleSendAsync<T>(T message, string queueName, DateTime scheduleTime, CancellationToken cancellation = default) where T : class, IEventOrMessage
{
    var endpointUri = new Uri($"queue:{queueName}");

    await messageScheduler.ScheduleSend(endpointUri, scheduleTime, message, cancellation);
}

Send Message

public async Task<ServiceResult<AddProjectResponse>> AddAsync(AddProjectRequest request)
{
    var project = mapper.Map<Project>(request);
    await projectRepository.AddAsync(project);
    await unitOfWork.SaveChangesAsync();

    await busService.ScheduleSendAsync(new ProjectAddedEvent(project.Id, project.Name, project.Description, project.OwnerId, project.ProjectManagerId, project.Link, project.Status, project.Icon, project.Color, project.Tags, project.StartDate, project.EndDate), ServiceBusConst.ProjectAddedEventQueueName, DateTime.UtcNow.AddSeconds(10));

    return ServiceResult<AddProjectResponse>.SuccessAsCreated(new AddProjectResponse(project.Id), $"api/projects/{project.Id}");
}

Const Class

public class ServiceBusConst
{
    public const string ProjectAddedEventQueueName = "project-added-event-queue";
}

Solution

  • There were two main reasons for the problem. 1. Reason

    When I manually received, my bindings were failing. I added the endpoints directly in the context as shown in the documentation. cfg.ConfigureEndpoints(context);

    2. Reason

    There were old exchanges with the same name and queues connected to them. During the tests, it was not enough to just delete the queue. There are exchanges that the queue is connected to. Since the same exchange already exists, it is not recreated. Since the exchange was not recreated, it could not reconnect while connecting the relevant queue. Clearing all exchanges and running the tests from scratch solved my problem.

    Edit:

    After cleaning everything, I tested again. I added a queue with a special name as below. It worked again without any problems. The main problem is that if the queues are not deleted in the relevant exchange after deleting them, no error is given during sending, but the sending process does not work either.

    cfg.ReceiveEndpoint(ServiceBusConst.ProjectAddedEventQueueName, e =>
    {
        e.ConfigureConsumer<ProjectAddedEventConsumer>(context);
    });