I am using a FastAPI repository pattern, and it works very well in Celery, but I have an Update query it doesn't execute. (The funny part is that, other queries work very well).
Here's my query:
UPDATE_REMINDER_SENT_AT_QUERY = """
UPDATE public.trainings
SET reminder_sent_at = :next_reminder_date AT TIME ZONE 'utc'
WHERE id = :training_id
RETURNING id;
"""
Then here's my repository:
async def update_reminder_sent_at(
self, training_id: UUID, next_reminder_date
):
async with self.db.transaction():
record: int = await self.db.fetch_val(
query=UPDATE_REMINDER_SENT_AT_QUERY,
values={
"training_id": training_id,
"next_reminder_date": next_reminder_date,
},
)
logger.info(f'record {record}')
And the service:
async def update_reminder_sent_at(
self, training_id: UUID, next_reminder_date
):
await self.repository.update_reminder_sent_at(training_id, next_reminder_date)
Then finally called in a Celery task:
@app_celery.task
def task_update(
training_id: UUID, user_id: UUID, tenant_id: UUID
):
"""Send email reminders for a specific training and user."""
async def send_reminder(training_id: UUID, user_id: UUID, tenant_id: UUID):
next_reminder_date = datetime.utcnow() + timedelta(
days=training.reminder.schedule_in_days
)
await training_service.update_reminder_sent_at(
training_id, next_reminder_date
)
asyncio.run(send_reminder(training_id, user_id, tenant_id))
So when I logged in the repository: logger.info(f'record {record}')
, I get something like this:
celery_worker | 2024-06-15 12:00:00.388 |INFO | api.trainings.repository:update_reminder_sent_at:163 - record d1d33ada-1e98-40e7-982c-f553b09bcaa0
Surprisingly checking the db, no effect made on the field. (The field didn't update at all).
laas_api=# select id, reminder_sent_at from trainings
laas_api-# ;
id | reminder_sent_at
--------------------------------------+------------------
b8b6b8d3-20ca-4ced-9720-4b6585f116d9 |
3f9e08af-dacf-4979-97be-bbe3f86c5979 |
fba1a12d-a8d9-418d-851a-a55e29c02996 |
679596a0-264c-4dad-aca2-37c197534621 |
24d3469d-bfa7-43f0-95b9-996105af6df0 |
45213723-0711-4d86-97ef-d9c7528bcebd |
d1d33ada-1e98-40e7-982c-f553b09bcaa0 |
(7 rows)
Here's how my db is declared:
@asynccontextmanager
async def get_db():
database = Database(
settings.database_url, force_rollback=True, min_size=3, max_size=20
)
await database.connect()
try:
yield database
finally:
await database.disconnect()
We solved it in the comments, but for anyone encountering a similar issue, here is the answer:
You always have to commit your data in order to persist them on the database. In your get_db
function you don't do any commit. Moreover, you configure your connection object with force_rollback=True
which causes all your changes to rollback upon closing.
From the documentation:
This will ensure that all database connections are run within a transaction that rollbacks once the database is disconnected.
So removing this flag plus a commit before disconnecting should resolve your issue.