.net-4.0task-parallel-library.net-4.5parallel-extensionsparallel.foreach

In parallel call, limit executions per second


Using TPL / Parallel.ForEach is there an out-of-the-box way to limit the number of times a method is called per unit of time (i.e. no more than 50 calls per second). This is different than limiting the number of threads. Perhaps there's some simple hack to make this work?


Solution

  • One solution is to make a thread-safe version of the following https://stackoverflow.com/a/7728872/356790

    /// <summary>
    /// This class limits the number of requests (method calls, events fired, etc.) that can occur in a given unit of time.
    /// </summary>
    class RequestLimiter
    {
    
        #region Constructors
    
        /// <summary>
        /// Initializes an instance of the RequestLimiter class.
        /// </summary>
        /// <param name="maxRequests">The maximum number of requests that can be made in a given unit of time.</param>
        /// <param name="timeSpan">The unit of time that the maximum number of requests is limited to.</param>
        /// <exception cref="ArgumentException">maxRequests &lt;= 0</exception>
        /// <exception cref="ArgumentException">timeSpan.TotalMilliseconds &lt;= 0</exception>
        public RequestLimiter( int maxRequests , TimeSpan timeSpan )
        {
            // check parameters
            if ( maxRequests <= 0 )
            {
                throw new ArgumentException( "maxRequests <= 0" , "maxRequests" );
            }
            if ( timeSpan.TotalMilliseconds <= 0 )
            {
                throw new ArgumentException( "timeSpan.TotalMilliseconds <= 0" , "timeSpan" );
            }
    
            // initialize instance vars
            _maxRequests = maxRequests;
            _timeSpan = timeSpan;
            _requestTimes = new Queue<DateTime>( maxRequests );
    
            // sleep for 1/10th timeSpan
            _sleepTimeInMs = Convert.ToInt32( Math.Ceiling( timeSpan.TotalMilliseconds / 10 ) );
        }
    
        #endregion
    
        /// <summary>
        /// Waits until an request can be made
        /// </summary>
        public void WaitUntilRequestCanBeMade()
        {
            while ( !TryEnqueueRequest() )
            {
                Thread.Sleep( _sleepTimeInMs );
            }
        }
    
        #region Private Members
    
        private readonly Queue<DateTime> _requestTimes;
        private readonly object _requestTimesLock = new object();
        private readonly int _maxRequests;
        private readonly TimeSpan _timeSpan;
        private readonly int _sleepTimeInMs;
    
        /// <summary>
        /// Remove requests that are older than _timeSpan
        /// </summary>
        private void SynchronizeQueue()
        {
            while ( ( _requestTimes.Count > 0 ) && ( _requestTimes.Peek().Add( _timeSpan ) < DateTime.Now ) )
            {
                _requestTimes.Dequeue();
            }
        }
    
        /// <summary>
        /// Attempts to enqueue a request.
        /// </summary>
        /// <returns>
        /// Returns true if the request was successfully enqueued.  False if not.
        /// </returns>
        private bool TryEnqueueRequest()
        {
            lock ( _requestTimesLock )
            {
                SynchronizeQueue();
                if ( _requestTimes.Count < _maxRequests )
                {
                    _requestTimes.Enqueue( DateTime.Now );
                    return true;
                }
                return false;
            }
        }
    
        #endregion
    
    }