I'm working with a web application that allows users to create events (one-off or recurring) on a calendar, and shortly before an event starts the system will notify its participants. I'm having trouble with designing the flow for such notification, particularly for recurring events.
A recurring event may have excluded dates (similar to RRULE and EXDATE combination).
Users can update event's time / recurring rule.
The application is written in Python and already using Celery 3.1 with Redis broker. Solutions work with this setting would be nice, though anything will do. From what I have found, it is hard to add periodic task dynamically with Celery currently.
A periodic task runs once a day, scanning every database and add tasks to do notification at appropriate time for each event that has a recurrence that day.
Each task generated as above has its id saved temporarily in Redis. In case users change event time for that day after its notification task is scheduled, the task will be revoked and replaced with new one.
Sample code for above solution:
In tasks.py
, all the tasks to run:
from celery.task import task as celery_task
from celery.result import AsyncResult
from datetime import datetime
# ...
@celery_task
def create_notify_task():
for account in system.query(Account):
db_session = account.get_session() # get sql alchemy session
for event in db_session.query(Event):
schedule_notify_event(account, partial_event)
@celery_task(name='notify_event_users')
def notify_event_users(account_id, event_id):
# do notification for every event participant
pass
def schedule_notify_event(account, event):
partial_event = event.get_partial_on(datetime.today())
if partial_event:
result = notify_event_users.apply_async(
args = (account.id, event.id),
eta = partial_event.start)
replace_task_id(account.id, event.id, result.id)
else:
replace_task_id(account.id, event.id, None)
def replace_task_id(account_id, event_id, result_id):
key = '{}:event'.format(account_id)
client = redis.get_client()
old_result_id = client.hget(key, event_id)
if old_result_id:
AsyncResult(old_result_id).revoke()
client.hset(key, event_id, result_id)
In event.py
:
# when a user change event's time
def update_event(event, data):
# ...
# update event
# ...
schedule_notify_event(account, event)
Celery setup file:
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'create-notify-every-day': {
'task': 'tasks.create_notify_task',
'schedule': crontab(minute=0, hour=0),
'args': (,)
},
}
Some downsides of the above are:
The daily task can take a long time to run. Events in databases processed last have to wait and might be missed. Scheduling that task earlier (e.g. 2 hours before next day) may alleviate this, however first run setup (or after a server restart) is a little awkward.
Care must be taken so that notify task doesn't get scheduled twice for the same event (e.g. because create_notify_task is run more than once a day...).
Is there a more sensible approach to this?
Related questions:
It's been a long time without any answer, and I forgot about this question. Anyway, at the time I went with the following solution. I outline it here in case someone is interested.
Some pros and cons that I could think of: