asp.net-corehangfire

Hangfire - DisableConcurrentExecution - Prevent concurrent execution if same value passed in method parameter


Hangfire DisableConcurrentExecution attribute not working as expected.

I have one method and that can be called with different Id. I want to prevent concurrent execution of method if same Id is passed.

string jobName= $"{Id} - Entry Job";

_recurringJobManager.AddOrUpdate<EntryJob>(jobName, j => j.RunAsync(Id, Null), "0 2 * * *");

My EntryJob interface having RunAsync method.

public class EntryJob:  IJob
  {
 [DisableConcurrentExecution(3600)] <-- Tried here
public async Task RunAsync(int Id, SomeObj obj)
    {
      //Some coe
    }
  }

And interface look like this

 [DisableConcurrentExecution(3600)] <-- Tried here
    public interface IJob
      {
       [DisableConcurrentExecution(3600)] <-- Tried here
        Task RunAsync(int Id, SomeObj obj);
      }

Now I want to prevent RunAsync method to call multiple times if Id is same. I have tried to put DisableConcurrentExecution on top of the RunAsync method at both location inside interface declaration and also from where Interface is implemented.

But it seems like not working for me. Is there any way to prevent concurrency based on Id?


Solution

  • The existing implementation of DisableConcurrentExecution does not support this. It will prevent concurrent executions of the method with any args. It would be fairly simple to add support in. Note below is untested pseudo-code:

    public class DisableConcurrentExecutionWithArgAttribute : JobFilterAttribute, IServerFilter
    {
        private readonly int _timeoutInSeconds;
        private readonly int _argPos;
    
        // add additional param to pass in which method arg you want to use for 
        // deduping jobs
        public DisableConcurrentExecutionAttribute(int timeoutInSeconds, int argPos)
        {
            if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");
    
            _timeoutInSeconds = timeoutInSeconds;
            _argPos = argPos;
        }
    
        public void OnPerforming(PerformingContext filterContext)
        {
            var resource = GetResource(filterContext.BackgroundJob.Job);
    
            var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);
    
            var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
            filterContext.Items["DistributedLock"] = distributedLock;
        }
    
        public void OnPerformed(PerformedContext filterContext)
        {
            if (!filterContext.Items.ContainsKey("DistributedLock"))
            {
                throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
            }
    
            var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
            distributedLock.Dispose();
        }
    
        private static string GetResource(Job job)
        {
            // adjust locked resource to include the argument to make it unique
            // for a given ID
            return $"{job.Type.ToGenericTypeString()}.{job.Method.Name}.{job.Args[_argPos].ToString()}";
        }
    }