I am trying to create a queueing system in python using Firestore. Even though I am using transactional functions, my simple test case has different outcomes on different runs.
The queue is to represent work tasks to operate on a Firestore document. The queue itself is a sub-collection on the document.
There's one special behavior for this queue: all tasks could be run concurrently. The task status is stored on the task and any task in the queue can be marked as completed at any time. BUT if the task being set to completed is the first item in the queue, then the system goes through sequentially and purges all completed tasks until the first non-completed shows up. It has a placeholder so the last "popped" task can be used to trigger other actions.
There are three files in this example:
queue_task.py
- the object type stored in the queuefirestore_queue_client.py
- the queue implementation in a Firestore clientmain.py
- the main test case itselfThe test case in main does:
When I run the demo, sometimes it completes successfully. Other times the purge operation doesn't see that the first task was updated and skips it, leading to the queue getting in a bad state.
I have the update and purge operations in separate transactions, and have even put in sleep commands between, but it isn't stable.
Am I missing any API options/parameters that will make these Firestore operations stable in the way I'm expecting?
Is Firestore not able to be used as a database in this way?
Thank you in advance
Following are the outputs, and then the demo files
Successful run output:
MAIN: Testing with document ID: test-document-3745189a-101d-49d3-a4a7-f50c0cf2178e
MAIN: Creating document in Firestore: queue size: 0
CLIENT: Creating missing document in Firestore with default metadata for a task queue
CLIENT: Getting the newest task
CLIENT: Task queue is empty
MAIN: Enqueuing 2 tasks: queue size: 0
CLIENT: Enqueueing task with ID task-02055bc2-9e25-40e0-8b8c-b6274002517f
CLIENT: Enqueueing task with ID task-0b8c522c-920d-41b7-b0a1-3e5e05d5ef90
CLIENT: Getting the newest task
MAIN: Updating first task status to COMPLETED: queue size: 2
CLIENT: Found 1 completed tasks
CLIENT: Can get info from the last completed task: QueueTask()
CLIENT: Deleting 1 completed tasks
CLIENT: Getting the newest task
MAIN: finished: queue size: 1
Failed run output:
MAIN: Testing with document ID: test-document-d468df49-8791-4958-882c-58a61c629252
MAIN: Creating document in Firestore: queue size: 0
CLIENT: Creating missing document in Firestore with default metadata for a task queue
CLIENT: Getting the newest task
CLIENT: Task queue is empty
MAIN: Enqueuing 2 tasks: queue size: 0
CLIENT: Enqueueing task with ID task-f564785b-70a7-427e-9844-899bdaf6144f
CLIENT: Enqueueing task with ID task-9546a329-9c2b-4b45-8867-3faabd0467e8
CLIENT: Getting the newest task
MAIN: Updating first task status to COMPLETED: queue size: 2
CLIENT: Found 0 completed tasks
CLIENT: No completed tasks to delete
CLIENT: Getting the newest task
Traceback (most recent call last):
File "firestore_transactions/main.py", line 81, in <module>
run_test()
File "firestore_transactions/main.py", line 76, in run_test
assert_msg(client.get_queue_size(document_id), 1)
File "firestore_transactions/main.py", line 27, in assert_msg
raise AssertionError(f"Expected: {expected}, but got: {actual}")
AssertionError: Expected: 1, but got: 2
main.py:
import typing
import uuid
from firestore_queue_client import FirestoreQueueClient
from queue_task import QueueTask, QueueTaskStatus
def generate_queue_task(document_id: str) -> QueueTask:
"""
Generate a queue task with a unique ID and default status.
"""
task_id = f"task-{uuid.uuid4()}"
return QueueTask(
document_id=document_id or f"document-{uuid.uuid4()}",
task_id=task_id,
task_status=QueueTaskStatus.PROCESSING,
)
def assert_msg(actual: typing.Any, expected: typing.Any) -> None:
"""
Assert that the actual value matches the expected value.
"""
if actual != expected:
raise AssertionError(f"Expected: {expected}, but got: {actual}")
def run_test() -> None:
Initialize the work queue client
client = FirestoreQueueClient(
project_id="PROJECT_ID",
database="DATABASE_ID",
collection_name="COLLECTION_ID",
sub_collection_name="queue",
)
# Generate a unique document ID for testing
document_id = f"test-document-{uuid.uuid4()}"
print(f"MAIN: Testing with document ID: {document_id}")
task_01: QueueTask = generate_queue_task(document_id=document_id)
task_02: QueueTask = generate_queue_task(document_id=document_id)
print(f"MAIN: Creating document in Firestore: queue size: {client.get_queue_size(document_id)}")
client.ensure_firestore_queue_exists(document_id=document_id)
assert_msg(client.get_latest_task(document_id), None)
assert_msg(client.get_queue_size(document_id), 0)
print(f"MAIN: Enqueuing 2 tasks: queue size: {client.get_queue_size(document_id)}")
client.enqueue_task(queue_task=task_01)
client.enqueue_task(queue_task=task_02)
assert_msg(
client.get_latest_task(document_id),
task_02,
)
assert_msg(client.get_queue_size(document_id), 2)
print(f"MAIN: Updating first task status to COMPLETED: queue size: {client.get_queue_size(document_id)}")
client.update_task_status(
document_id=document_id,
task_id=task_01.task_id,
status=QueueTaskStatus.COMPLETED,
)
assert_msg(
client.get_latest_task(document_id),
task_02,
)
assert_msg(client.get_queue_size(document_id), 1)
print(f"MAIN: finished: queue size: {client.get_queue_size(document_id)}")
if __name__ == "__main__":
run_test()
firestore_queue_client.py:
from typing import Optional, List
from google.cloud import firestore
from queue_task import QueueTask, QueueTaskStatus, get_utc_timestamp_now
class FirestoreQueueClient:
def __init__(
self,
project_id: str,
database: str,
collection_name: str,
sub_collection_name: str,
):
self.project_id = project_id
self.database = database
self.collection_name = collection_name
self.sub_collection_name = sub_collection_name
"""Initialize the Firestore connection."""
self.db = firestore.Client(
project=self.project_id,
database=self.database,
)
def ensure_firestore_queue_exists(
self,
document_id: str,
) -> None:
"""
Ensure the document exists in Firestore, creating it if missing.
"""
with self.db.transaction() as transaction:
_ensure_firestore_queue_exists(
transaction=transaction,
db=self.db,
collection_name=self.collection_name,
document_id=document_id,
)
def enqueue_task(
self,
queue_task: QueueTask,
) -> None:
"""
Enqueue a task in the Firestore queue.
"""
with self.db.transaction() as transaction:
_enqueue_task_transaction(
transaction=transaction,
db=self.db,
collection_name=self.collection_name,
sub_collection_name=self.sub_collection_name,
queue_task=queue_task,
)
def update_task_status(
self,
document_id: str,
task_id: str,
status: QueueTaskStatus,
) -> None:
"""
Update the status of a task in the Firestore queue.
"""
with self.db.transaction() as transaction:
# set the status of the task
_update_task_status_transaction(
transaction=transaction,
db=self.db,
collection_name=self.collection_name,
sub_collection_name=self.sub_collection_name,
document_id=document_id,
task_id=task_id,
task_status=status,
)
with self.db.transaction() as transaction:
# run the purge function in case this completed task is at the front of the queue
_purge_completed_and_set_status(
transaction=transaction,
db=self.db,
collection_name=self.collection_name,
sub_collection_name=self.sub_collection_name,
document_id=document_id,
)
def get_latest_task(
self,
document_id: str,
) -> Optional[QueueTask]:
"""
Get the last expected task of the document from Firestore.
"""
with self.db.transaction() as transaction:
return _get_newest_task_transaction(
transaction=transaction,
db=self.db,
collection_name=self.collection_name,
sub_collection_name=self.sub_collection_name,
document_id=document_id,
)
def get_queue_size(self, document_id: str) -> int:
"""
Get the size of the queue for a given document.
"""
with self.db.transaction() as transaction:
return _get_queue_size_transaction(
transaction=transaction,
db=self.db,
collection_name=self.collection_name,
sub_collection_name=self.sub_collection_name,
document_id=document_id,
)
@firestore.transactional
def _ensure_firestore_queue_exists(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
document_id: str,
) -> firestore.DocumentReference:
"""
Ensure the document exists in Firestore, creating it if missing.
"""
doc_reference = db.collection(collection_name).document(document_id)
doc_snapshot = doc_reference.get(transaction=transaction)
if not doc_snapshot.exists:
print(
"CLIENT: Creating missing document in Firestore with default metadata for a task queue"
)
# Must create the document with some metadata. Otherwise, adding a subcollection will cause the document to be a phantom
transaction.set(doc_reference, {"key": "value"})
return doc_reference
@firestore.transactional
def _enqueue_task_transaction(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
sub_collection_name: str,
queue_task: QueueTask,
) -> None:
# Get a task reference
task_ref = (
db.collection(collection_name)
.document(queue_task.document_id)
.collection(sub_collection_name)
.document(queue_task.task_id)
)
# Check if a task with this ID already exists
task_doc = task_ref.get(transaction=transaction)
if task_doc.exists:
raise ValueError(f"CLIENT: Task with ID {queue_task.task_id} already exists")
# write the task
print(f"CLIENT: Enqueueing task with ID {queue_task.task_id}")
transaction.set(reference=task_ref, document_data=queue_task.to_dict())
@firestore.transactional
def _get_newest_task_transaction(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
sub_collection_name: str,
document_id: str,
) -> Optional[QueueTask]:
print("CLIENT: Getting the newest task")
return _get_newest_task(
transaction=transaction,
db=db,
collection_name=collection_name,
sub_collection_name=sub_collection_name,
document_id=document_id,
)
@firestore.transactional
def _purge_completed_and_set_status(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
sub_collection_name: str,
document_id: str,
) -> None:
# Query the subcollection for current tasks ordered by creation date
ordered_tasks = (
db.collection(collection_name)
.document(document_id)
.collection(sub_collection_name)
.order_by(
QueueTask.DB_ORDER_BY, direction=QueueTask.DB_ORDER_DIRECTION_OLD_FIRST
)
.get(transaction=transaction)
)
# variables for tracking during loop
last_completed_task: Optional[QueueTask] = None
references_to_delete: List[firestore.DocumentReference] = []
# Going in order of creation date, delete completed tasks until we find the last task that is completed
# this task will be the one we want to set the status for
for reference in ordered_tasks:
queue_task = QueueTask.from_dict(reference.to_dict())
if queue_task.task_status == QueueTaskStatus.COMPLETED:
references_to_delete.append(reference.reference)
last_completed_task = queue_task
else:
break
print(f'CLIENT: Found {len(references_to_delete)} completed tasks')
if last_completed_task:
print(
f"CLIENT: Can get info from the last completed task: {last_completed_task}"
)
# Delete the completed tasks
if references_to_delete:
print(f"CLIENT: Deleting {len(references_to_delete)} completed tasks")
for reference in references_to_delete:
transaction.delete(reference)
else:
print("CLIENT: No completed tasks to delete")
@firestore.transactional
def _update_task_status_transaction(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
sub_collection_name: str,
document_id: str,
task_id: str,
task_status: QueueTaskStatus,
) -> None:
task_ref = (
db.collection(collection_name)
.document(document_id)
.collection(sub_collection_name)
.document(task_id)
)
task = task_ref.get(transaction=transaction)
if not task.exists:
raise ValueError(f"CLIENT: Task with ID {task_id} not found")
else:
queue_task = QueueTask.from_dict(task.to_dict())
if queue_task.task_status == task_status:
print(f"CLIENT: Task with ID {task_id} already has status {task_status}")
else:
queue_task.task_status = task_status
queue_task.updated_at = get_utc_timestamp_now()
transaction.update(task_ref, queue_task.to_dict())
def _get_newest_task(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
sub_collection_name: str,
document_id: str,
) -> Optional[QueueTask]:
result = None
# Query for the most recently created task. Should be the last one to complete
query = (
db.collection(collection_name)
.document(document_id)
.collection(sub_collection_name)
.order_by(
QueueTask.DB_ORDER_BY, direction=QueueTask.DB_ORDER_DIRECTION_NEW_FIRST
)
.limit(1)
)
tasks = query.get(transaction=transaction)
if not tasks:
print("CLIENT: Task queue is empty")
else:
result = QueueTask.from_dict(tasks[0].to_dict())
return result
@firestore.transactional
def _get_queue_size_transaction(
transaction: firestore.Transaction,
db: firestore.Client,
collection_name: str,
sub_collection_name: str,
document_id: str,
) -> int:
# Query the subcollection for current tasks ordered by creation date
ordered_tasks = (
db.collection(collection_name)
.document(document_id)
.collection(sub_collection_name)
.get(transaction=transaction)
)
return len(ordered_tasks)
and queue_task.py:
import dataclasses
import datetime
from enum import Enum
import typing
class QueueTaskStatus(Enum):
PROCESSING = "processing"
COMPLETED = "completed"
@staticmethod
def from_string(status: str) -> "QueueTaskStatus":
for status_enum in QueueTaskStatus:
if status_enum.value == status:
return status_enum
raise ValueError(f"Unknown status: {status}")
def get_utc_timestamp_now() -> str:
# timestamp in a format where alphabetical order is also chronological order
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S%z")
@dataclasses.dataclass
class QueueTask:
# Field names for Firestore
FN_DOCUMENT_ID: typing.ClassVar[str] = "document_id"
FN_TASK_ID: typing.ClassVar[str] = "task_id"
FN_TASK_STATUS: typing.ClassVar[str] = "task_status"
FN_CREATED_AT: typing.ClassVar[str] = "created_at"
# Task ordering keys used in Firestore
DB_ORDER_BY: typing.ClassVar[str] = FN_CREATED_AT
DB_ORDER_DIRECTION_NEW_FIRST: typing.ClassVar[str] = "DESCENDING"
DB_ORDER_DIRECTION_OLD_FIRST: typing.ClassVar[str] = "ASCENDING"
def __init__(
self,
document_id: str,
task_id: str,
task_status: QueueTaskStatus = QueueTaskStatus.PROCESSING,
created_at: typing.Optional[str] = None,
):
self.document_id: str = document_id
self.task_id: str = task_id
self.task_status: QueueTaskStatus = task_status
self.created_at: str = created_at or get_utc_timestamp_now()
def to_dict(self) -> typing.Dict[str, str]:
return {
self.FN_DOCUMENT_ID: self.document_id,
self.FN_TASK_ID: self.task_id,
self.FN_TASK_STATUS: self.task_status.value,
self.FN_CREATED_AT: self.created_at,
}
@staticmethod
def from_dict(data: dict[str, typing.Any]) -> "QueueTask":
return QueueTask(
document_id=data[QueueTask.FN_DOCUMENT_ID],
task_id=data[QueueTask.FN_TASK_ID],
task_status=QueueTaskStatus.from_string(data[QueueTask.FN_TASK_STATUS]),
created_at=data[QueueTask.FN_CREATED_AT],
)
The issue was simple. The code ordered by the timestamp, but the timestamp only had seconds of resolution.
When generating the timestamps, switching from the custom format string to isoformat:
def get_utc_timestamp_now() -> str:
# timestamp in a format where alphabetical order is also chronological order
return datetime.datetime.now(datetime.timezone.utc).isoformat()