pythondjango-viewsqueuebufferproducer-consumer

Producer/Consumer - Queue.get by list


I am trying to get data from DB in order to input to machine Learning model. But my Producer only put data to queue one by one row in DB and Consumer also only get one by one. My machine learning Model needs 2000 rows to process.

I want Producer and Consumer puts and gets by a mount of rows per time, not only one by one

here is my code in python

def read_data():
    
    torque_data_queryset = Torque.get_torque_data()
    return list(torque_data_queryset)


def producer(queue):
    print('Producer: Running')
    data = read_data()
    for item in data:
       
        time.sleep(1)
        
        queue.put(item)
        print(f'> Producer added {item}')
    
    queue.put(None)
    print('Producer: Done')

# consumer task
def consumer(queue):
    print('Consumer: Running')
    while True:
        
        item = queue.get()
        print(item)
        
        if item is None:
            break
        
        time.sleep(1)
        
        print(f'> Consumer got {item}')
    print('Consumer: Done')

def thread_function():
    
    queue = Queue()
    
    consumer_thread = Thread(target=consumer, args=(queue,))
    consumer_thread.start()
    
    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.start()
    
    producer_thread.join()
    consumer_thread.join()
    
def thread_result(request):
    
    thread_function()
    return render(request, 'async_processing_result.html', {'message': 'Threads completed successfully!'})

Solution

  • You can put() a list of objects onto queue.Queue. The consumer will get() the entire list at once.

    For example:

    import threading
    import queue
    
    
    def consumer(q: queue.Queue):
        while (value := q.get()) is not None:
            print(*value, sep="\n")
    
    
    def producer(q: queue.Queue):
        # create a list of dictionaries
        data = [{"int": i, "chr": chr(i + ord("a"))} for i in range(10)]
        q.put(data)
        q.put(None)
    
    
    def main():
        q = queue.Queue()
        threads = []
        for func in producer, consumer:
            (t := threading.Thread(target=func, args=(q,))).start()
            threads.append(t)
        for t in threads:
            t.join()
    
    
    if __name__ == "__main__":
        main()
    

    Output:

    {'int': 0, 'chr': 'a'}
    {'int': 1, 'chr': 'b'}
    {'int': 2, 'chr': 'c'}
    {'int': 3, 'chr': 'd'}
    {'int': 4, 'chr': 'e'}
    {'int': 5, 'chr': 'f'}
    {'int': 6, 'chr': 'g'}
    {'int': 7, 'chr': 'h'}
    {'int': 8, 'chr': 'i'}
    {'int': 9, 'chr': 'j'}