pythonpostgresqlceleryfastapicelerybeat

Update query not working with Celery and FastAPI


I am using a FastAPI repository pattern, and it works very well in Celery, but I have an Update query it doesn't execute. (The funny part is that, other queries work very well).

Here's my query:

UPDATE_REMINDER_SENT_AT_QUERY = """
UPDATE public.trainings
SET reminder_sent_at = :next_reminder_date  AT TIME ZONE 'utc'
WHERE id = :training_id
RETURNING id;
"""

Then here's my repository:

 async def update_reminder_sent_at(
        self, training_id: UUID, next_reminder_date
    ):
        async with self.db.transaction():
            record: int = await self.db.fetch_val(
                query=UPDATE_REMINDER_SENT_AT_QUERY,
                values={
                    "training_id": training_id,
                    "next_reminder_date": next_reminder_date,
                },
            )
            logger.info(f'record {record}')

And the service:

async def update_reminder_sent_at(
    self, training_id: UUID, next_reminder_date
):
    await self.repository.update_reminder_sent_at(training_id, next_reminder_date)

Then finally called in a Celery task:

@app_celery.task
def task_update(
    training_id: UUID, user_id: UUID, tenant_id: UUID
):
    """Send email reminders for a specific training and user."""

    async def send_reminder(training_id: UUID, user_id: UUID, tenant_id: UUID):
         next_reminder_date = datetime.utcnow() + timedelta(
                    days=training.reminder.schedule_in_days
                )
                await training_service.update_reminder_sent_at(
                    training_id, next_reminder_date
                )

      asyncio.run(send_reminder(training_id, user_id, tenant_id))

So when I logged in the repository: logger.info(f'record {record}'), I get something like this:

celery_worker       | 2024-06-15 12:00:00.388 |INFO     | api.trainings.repository:update_reminder_sent_at:163 - record d1d33ada-1e98-40e7-982c-f553b09bcaa0 

Surprisingly checking the db, no effect made on the field. (The field didn't update at all).

laas_api=# select id, reminder_sent_at  from trainings
laas_api-# ;
                  id                  | reminder_sent_at
--------------------------------------+------------------
 b8b6b8d3-20ca-4ced-9720-4b6585f116d9 |
 3f9e08af-dacf-4979-97be-bbe3f86c5979 |
 fba1a12d-a8d9-418d-851a-a55e29c02996 |
 679596a0-264c-4dad-aca2-37c197534621 |
 24d3469d-bfa7-43f0-95b9-996105af6df0 |
 45213723-0711-4d86-97ef-d9c7528bcebd |
 d1d33ada-1e98-40e7-982c-f553b09bcaa0 |
(7 rows)

Note

Here's how my db is declared:

@asynccontextmanager
async def get_db():
    database = Database(
        settings.database_url, force_rollback=True, min_size=3, max_size=20
    )
    await database.connect()
    try:
        yield database
    finally:
        await database.disconnect()

Solution

  • We solved it in the comments, but for anyone encountering a similar issue, here is the answer:

    You always have to commit your data in order to persist them on the database. In your get_db function you don't do any commit. Moreover, you configure your connection object with force_rollback=True which causes all your changes to rollback upon closing.

    From the documentation:

    This will ensure that all database connections are run within a transaction that rollbacks once the database is disconnected.

    So removing this flag plus a commit before disconnecting should resolve your issue.