I am trying to implement a Queue
operation with producer-consumer
functionality.
The producer reads data from a database in one thread and adds item to a queue which in turn is consumed by consumer down the line in a second thread.
Problem is whenever I try to add item to the queue, the operation code gets stuck in line inqueue.put(raw_record)
. I have no idea why.
There is a maxsize
implemented for controlling the flow of data. However, the problem occurs even while the queue is empty.
The code for the producer operation is as follows:
import sqlite3
from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
from dare.logsetup import Logger
logger = Logger().get_logger()
class RawData:
def __init__(self,
db_path: str) -> None:
self.read_frequency = 1
self.read_db_path = db_path
self.inqueue = Queue(maxsize=8)
def _setup(self):
conn = sqlite3.connect(self.read_db_path)
self.cursor = conn.cursor()
def fetch_surface_data(self,
read_yet: int,
num_read: int = 1) -> List[Tuple]:
query = f"""SELECT seq, "Time & Date", Depth
FROM "Table Name"
LIMIT {num_read}
OFFSET {read_yet};"""
self.cursor.execute(query)
return self.cursor.fetchall()
def fetch_total_records(self) -> int:
query = """SELECT count(*) FROM "Table Name";"""
self.cursor.execute(query)
return self.cursor.fetchone()[0]
def process_raw_data(self):
self._setup()
read_yet = 0
total_records = self.fetch_total_records()
while read_yet < total_records:
record = self.fetch_surface_data(read_yet)
read_yet += 1
logger.info(f"Rows read:{read_yet}")
# Add converted_record to buffer for cleaning
self.inqueue.put(record)
self.inqueue.put(None) # sentinal value
def clean_converted_data(self) -> List[Optional[float]]:
data = []
initial_setup_done = False
last_val = None
while True:
with self.inqueue.not_empty:
self.inqueue.not_empty.wait_for(self.inqueue.full())
if not initial_setup_done:
first_record = self.inqueue.get()
if first_record is None:
logger.error("First record is None...")
raise ValueError("First record is None...")
last_val = first_record
data.append(last_val)
initial_setup_done = True
while not self.inqueue.empty():
try:
record = self.inqueue.get(timeout=1) # Get data from the queue
logger.info(f"Consumer thread record: {record}")
if record is None: # Sentinel value to stop the loop
return data
last_val = record
except Empty:
continue # Continue if no new data is available
if __name__ == '__main__':
db_path = "path to sqlite db"
raw = RawData(db_path=db_path)
producer = Thread(target=raw.process_raw_data)
logger.info("Starting Producer thread...")
producer.start()
consumer = Thread(target=raw.clean_converted_data)
logger.info("Starting Consumer thread...")
consumer.start()
producer.join()
consumer.join()
As there is no error thrown, I am just confused as how to address the issue. Your advice would be very valuable.
Your implementation is faulty and it has nothing to do with threads per se. In order to be able to run your script, I re-wrote it to eliminate all dependence on third-party libraries or external databases. Then it looks like this:
from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
import time
class RawData:
def __init__(self) -> None:
self.read_frequency = 1
self.inqueue: Queue = Queue(maxsize=8)
def fetch_surface_data(self, read_yet: int) -> str:
return f"Data {read_yet}"
def process_raw_data(self):
read_yet = 0
total_records = 10
while read_yet < total_records:
record = self.fetch_surface_data(read_yet)
read_yet += 1
print(f"Read:{read_yet}")
self.inqueue.put(record)
time.sleep(1.0 / self.read_frequency)
self.inqueue.put(None) # sentinal value
def clean_converted_data(self) -> List[Optional[float]]:
data = []
initial_setup_done = False
last_val = None
while True:
with self.inqueue.not_empty:
self.inqueue.not_empty.wait_for(self.inqueue.full())
if not initial_setup_done:
first_record = self.inqueue.get()
if first_record is None:
print("First record is None...")
raise ValueError("First record is None...")
last_val = first_record
data.append(last_val)
initial_setup_done = True
while not self.inqueue.empty():
try:
record = self.inqueue.get(timeout=1)
print(f"Consumer thread record: {record}")
if record is None: # Sentinel value to stop the loop
return data
last_val = record
# Append to data seems to be missing??
except Empty:
continue # Continue if no new data is available
if __name__ == '__main__':
raw = RawData()
producer = Thread(target=raw.process_raw_data)
print("Starting Producer thread...")
producer.start()
consumer = Thread(target=raw.clean_converted_data)
print("Starting Consumer thread...")
consumer.start()
producer.join()
print("Producer thread closed")
consumer.join()
print("Consumer thread closed")
Instead of reading from a database I am just formatting and passing some strings around. This script also locks up on the statement self.inqueue.put(record)
. I think it's worth understanding why.
In your consumer thread you have this statement:
with self.inqueue.not_empty:
The member object Queue.not_empty is a threading.Condition, and its context manager will acquire the Condition's lock and will not release it. This object is part of the internal implementation of Queue and is not documented, so I don't know how you even knew it was in there.
When your producer thread adds an object to the Queue, it tries to notify Queue.not_empty. To do so it must acquire the lock, but it can never do that because the consumer thread is holding it. This is also part of the internal implementation of Queue. So your program hangs right there.
This is definitely not the way you're supposed to use Queues. Stick to the documented functions; they have everything you need. The people who wrote the Python standard library knew what they were doing.
A side note: Your consumer thread also contains this line:
self.inqueue.not_empty.wait_for(self.inqueue.full())
This is a syntax error. The first argument to Condition.wait_for is supposed to be a Callable and you are passing a boolean - the returned value from full(). You don't get an error message because your code never gets that far, for reasons already explained.
The solution is to remove all the code that doesn't use documented functions. The script is shorter and it now works.
from typing import List, Tuple, Optional
from queue import Queue, Empty
from threading import Thread
import time
class RawData:
def __init__(self) -> None:
self.read_frequency = 1
self.inqueue: Queue = Queue(maxsize=8)
def fetch_surface_data(self, read_yet: int) -> str:
return f"Data {read_yet}"
def process_raw_data(self):
read_yet = 0
total_records = 10
while read_yet < total_records:
record = self.fetch_surface_data(read_yet)
print(f"Read:{read_yet}")
read_yet += 1
self.inqueue.put(record)
time.sleep(1.0 / self.read_frequency)
self.inqueue.put(None) # sentinal value
def clean_converted_data(self) -> list[str]:
data: list[str] = []
first_record = self.inqueue.get()
if first_record is None:
print("First record is None...")
raise ValueError("First record is None...")
print(f"First record {first_record}")
data.append(first_record)
while True:
record = self.inqueue.get()
print(f"Consumer thread record: {record}")
if record is None: # Sentinel value to stop the loop
break
data.append(record)
return data
if __name__ == '__main__':
raw = RawData()
producer = Thread(target=raw.process_raw_data)
print("Starting Producer thread...")
producer.start()
consumer = Thread(target=raw.clean_converted_data)
print("Starting Consumer thread...")
consumer.start()
producer.join()
print("Producer thread closed")
consumer.join()
print("Consumer thread closed")
I also moved the code that handles the first item in the queue outside the while loop, and added an data.append() statement inside the while loop because it seemed to be missing. I removed the timeout for the Queue.get() function since you were suppressing the exception anyway.