I just wrote a task queue in Python whose job is to limit the number of tasks that are run at one time. This is a little different than Queue.Queue
because instead of limiting how many items can be in the queue, it limits how many can be taken out at one time. It still uses an unbounded Queue.Queue
to do its job, but it relies on a Semaphore
to limit the number of threads:
from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread
class TaskQueue(object):
"""
Queues tasks to be run in separate threads and limits the number
concurrently running tasks.
"""
def __init__(self, limit):
"""Initializes a new instance of a TaskQueue."""
self.__semaphore = BoundedSemaphore(limit)
self.__queue = Queue()
self.__cancelled = False
self.__lock = Lock()
def enqueue(self, callback):
"""Indicates that the given callback should be ran."""
self.__queue.put(callback)
def start(self):
"""Tells the task queue to start running the queued tasks."""
thread = Thread(target=self.__process_items)
thread.start()
def stop(self):
self.__cancel()
# prevent blocking on a semaphore.acquire
self.__semaphore.release()
# prevent blocking on a Queue.get
self.__queue.put(lambda: None)
def __cancel(self):
print 'canceling'
with self.__lock:
self.__cancelled = True
def __process_items(self):
while True:
# see if the queue has been stopped before blocking on acquire
if self.__is_canceled():
break
self.__semaphore.acquire()
# see if the queue has been stopped before blocking on get
if self.__is_canceled():
break
callback = self.__queue.get()
# see if the queue has been stopped before running the task
if self.__is_canceled():
break
def runTask():
try:
callback()
finally:
self.__semaphore.release()
thread = Thread(target=runTask)
thread.start()
self.__queue.task_done()
def __is_canceled(self):
with self.__lock:
return self.__cancelled
The Python interpreter runs forever unless I explicitly stop the task queue. This is a lot more tricky than I thought it would be. If you look at the stop
method, you'll see that I set a canceled
flag, release
the semaphore and put
a no-op callback on the queue. The last two parts are necessary because the code could be blocking on the Semaphore
or on the Queue
. I basically have to force these to go through so that the loop has a chance to break out.
This code works. This class is useful when running a service that is trying to run thousands of tasks in parallel. In order to keep the machine running smoothly and to prevent the OS from screaming about too many active threads, this code will limit the number of threads living at any one time.
I have written a similar chunk of code in C# before. What made that code particular cut 'n' dry was that .NET has something called a CancellationToken
that just about every threading class uses. Any time there is a blocking operation, that operation takes an optional token. If the parent task is ever canceled, any child tasks blocking with that token will be immediately canceled, as well. This seems like a much cleaner way to exit than to "fake it" by releasing semaphores or putting values in a queue.
I was wondering if there was an equivalent way of doing this in Python? I definitely want to be using threads instead of something like asynchronous events. I am wondering if there is a way to achieve the same thing using two Queue.Queue
s where one is has a max size and the other doesn't - but I'm still not sure how to handle cancellation.
You seem to be creating a new thread for each task from the queue. This is wasteful in itself, and also leads you to the problem of how to limit the number of threads.
Instead, a common approach is to create a fixed number of worker threads and let them freely pull tasks from the queue. To cancel the queue, you can clear it and let the workers stay alive in anticipation of future work.