pythonqueue

Python Queue module gets stuck when used in threads


I am trying to implement a Queue operation with producer-consumer functionality. The producer reads data from a database in one thread and adds item to a queue which in turn is consumed by consumer down the line in a second thread.

Problem is whenever I try to add item to the queue, the operation code gets stuck in line inqueue.put(raw_record). I have no idea why.

There is a maxsize implemented for controlling the flow of data. However, the problem occurs even while the queue is empty.

The code for the producer operation is as follows:

import sqlite3

from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread

from dare.logsetup import Logger

logger = Logger().get_logger()

class RawData:
    def __init__(self, 
                 db_path: str) -> None:
        self.read_frequency = 1
        self.read_db_path = db_path
        self.inqueue = Queue(maxsize=8)

    def _setup(self):
        conn = sqlite3.connect(self.read_db_path)
        self.cursor = conn.cursor()

    def fetch_surface_data(self,  
                           read_yet: int, 
                           num_read: int = 1) -> List[Tuple]:

        query = f"""SELECT seq, "Time & Date", Depth
        FROM "Table Name" 
        LIMIT {num_read} 
        OFFSET {read_yet};"""

        self.cursor.execute(query)
        return self.cursor.fetchall()
    
    def fetch_total_records(self) -> int:
        query = """SELECT count(*) FROM "Table Name";"""
        self.cursor.execute(query)
        return self.cursor.fetchone()[0]

    def process_raw_data(self):
        self._setup()
        read_yet = 0
        total_records = self.fetch_total_records()
        
        while read_yet < total_records:
            
            record = self.fetch_surface_data(read_yet)
            read_yet += 1

            logger.info(f"Rows read:{read_yet}")
            # Add converted_record to buffer for cleaning
            self.inqueue.put(record)
        self.inqueue.put(None) # sentinal value

    def clean_converted_data(self) -> List[Optional[float]]:
        data = []
        initial_setup_done = False
        last_val = None

        while True:
            with self.inqueue.not_empty:
                self.inqueue.not_empty.wait_for(self.inqueue.full())

                if not initial_setup_done:
                    first_record = self.inqueue.get()
                    if first_record is None:
                        logger.error("First record is None...")
                        raise ValueError("First record is None...")
                    last_val = first_record
                    data.append(last_val)
                    initial_setup_done = True

                while not self.inqueue.empty():
                    try:
                        record = self.inqueue.get(timeout=1)  # Get data from the queue
                        logger.info(f"Consumer thread record: {record}")
                        if record is None:  # Sentinel value to stop the loop
                            return data
                        last_val = record
                    except Empty:
                        continue  # Continue if no new data is available



if __name__ == '__main__':
    db_path = "path to sqlite db"
    raw = RawData(db_path=db_path)
    
    producer = Thread(target=raw.process_raw_data)
    logger.info("Starting Producer thread...")
    producer.start()
    
    consumer = Thread(target=raw.clean_converted_data)
    logger.info("Starting Consumer thread...")
    consumer.start()
    
    producer.join()
    consumer.join()

As there is no error thrown, I am just confused as how to address the issue. Your advice would be very valuable.


Solution

  • Your implementation is faulty and it has nothing to do with threads per se. In order to be able to run your script, I re-wrote it to eliminate all dependence on third-party libraries or external databases. Then it looks like this:

    from typing import List, Tuple, Optional
    from queue import Queue, Empty
    from threading import Thread
    import time
    
    class RawData:
        def __init__(self) -> None:
            self.read_frequency = 1
            self.inqueue: Queue = Queue(maxsize=8)
    
        def fetch_surface_data(self, read_yet: int) -> str:
            return f"Data {read_yet}"
        
        def process_raw_data(self):
            read_yet = 0
            total_records = 10
            while read_yet < total_records:
                record = self.fetch_surface_data(read_yet)
                read_yet += 1
                print(f"Read:{read_yet}")
                self.inqueue.put(record)
                time.sleep(1.0 / self.read_frequency)
            self.inqueue.put(None) # sentinal value
    
        def clean_converted_data(self) -> List[Optional[float]]:
            data = []
            initial_setup_done = False
            last_val = None
    
            while True:
                with self.inqueue.not_empty:
                    self.inqueue.not_empty.wait_for(self.inqueue.full())
    
                    if not initial_setup_done:
                        first_record = self.inqueue.get()
                        if first_record is None:
                            print("First record is None...")
                            raise ValueError("First record is None...")
                        last_val = first_record
                        data.append(last_val)
                        initial_setup_done = True
    
                    while not self.inqueue.empty():
                        try:
                            record = self.inqueue.get(timeout=1)
                            print(f"Consumer thread record: {record}")
                            if record is None:  # Sentinel value to stop the loop
                                return data
                            last_val = record
                            # Append to data seems to be missing??
                        except Empty:
                            continue  # Continue if no new data is available
    
    if __name__ == '__main__':
        raw = RawData()
        
        producer = Thread(target=raw.process_raw_data)
        print("Starting Producer thread...")
        producer.start()
        
        consumer = Thread(target=raw.clean_converted_data)
        print("Starting Consumer thread...")
        consumer.start()
        
        producer.join()
        print("Producer thread closed")
        consumer.join()
        print("Consumer thread closed")
    

    Instead of reading from a database I am just formatting and passing some strings around. This script also locks up on the statement self.inqueue.put(record). I think it's worth understanding why.

    In your consumer thread you have this statement:

    with self.inqueue.not_empty:
    

    The member object Queue.not_empty is a threading.Condition, and its context manager will acquire the Condition's lock and will not release it. This object is part of the internal implementation of Queue and is not documented, so I don't know how you even knew it was in there.

    When your producer thread adds an object to the Queue, it tries to notify Queue.not_empty. To do so it must acquire the lock, but it can never do that because the consumer thread is holding it. This is also part of the internal implementation of Queue. So your program hangs right there.

    This is definitely not the way you're supposed to use Queues. Stick to the documented functions; they have everything you need. The people who wrote the Python standard library knew what they were doing.

    A side note: Your consumer thread also contains this line:

    self.inqueue.not_empty.wait_for(self.inqueue.full())
    

    This is a syntax error. The first argument to Condition.wait_for is supposed to be a Callable and you are passing a boolean - the returned value from full(). You don't get an error message because your code never gets that far, for reasons already explained.

    The solution is to remove all the code that doesn't use documented functions. The script is shorter and it now works.

    from typing import List, Tuple, Optional
    from queue import Queue, Empty
    from threading import Thread
    import time
    
    class RawData:
        def __init__(self) -> None:
            self.read_frequency = 1
            self.inqueue: Queue = Queue(maxsize=8)
    
        def fetch_surface_data(self, read_yet: int) -> str:
            return f"Data {read_yet}"
        
        def process_raw_data(self):
            read_yet = 0
            total_records = 10
            while read_yet < total_records:
                record = self.fetch_surface_data(read_yet)
                print(f"Read:{read_yet}")
                read_yet += 1
                self.inqueue.put(record)
                time.sleep(1.0 / self.read_frequency)
            self.inqueue.put(None) # sentinal value
    
        def clean_converted_data(self) -> list[str]:
            data: list[str] = []
    
            first_record = self.inqueue.get()
            if first_record is None:
                print("First record is None...")
                raise ValueError("First record is None...")
            print(f"First record {first_record}")
            data.append(first_record)
            while True:
                record = self.inqueue.get()
                print(f"Consumer thread record: {record}")
                if record is None:  # Sentinel value to stop the loop
                    break
                data.append(record)
            return data
    
    if __name__ == '__main__':
        raw = RawData()
        
        producer = Thread(target=raw.process_raw_data)
        print("Starting Producer thread...")
        producer.start()
        
        consumer = Thread(target=raw.clean_converted_data)
        print("Starting Consumer thread...")
        consumer.start()
        
        producer.join()
        print("Producer thread closed")
        consumer.join()
        print("Consumer thread closed")
    

    I also moved the code that handles the first item in the queue outside the while loop, and added an data.append() statement inside the while loop because it seemed to be missing. I removed the timeout for the Queue.get() function since you were suppressing the exception anyway.