pythonflaskapscheduler

How to dynamically schedule tasks using Flask Requests (APScheduler)?


I am building a smart-home web-application from scratch using flask as the backend.

I want to be able to create a new task in the frontend (e.g. turn off light at 11pm), which then gets passed to the backend using a request as following:

@app.route("/create-task/")
def createTask():
    # create a scheduled task based on the users input
    return "something"

I looked at other posts and came across APScheduler as a way of automating tasks with Flask. However, I have struggled to find solutions on how to implement that with this dynamic task creation I had in mind.

How can I do this?


Solution

  • You can implement a simple form of task scheduling using no additional libraries other than threading.

    To do that, you will need

    1. a task storage(e.g. an SQL database)

    2. a background thread whose target is a function running an infinite loop that checks whether any task should have been completed but has not been completed yet

    I will put all the code in one script, but please do not do the same and split it out into separate modules

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy as SQLA
    from sqlalchemy import Integer, String, DateTime, Boolean
    from sqlalchemy.orm import DeclarativeBase
    from datetime import datetime, now
    from time import sleep
    from threading import Thread as T
    
    app = Flask(__name__)
    db = SQLAlchemy(app)
    app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite://test.db"
    
    class Task(DeclarativeBase):
        id = Column(Integer, primary_key=True)
        name = Column(String)
        execution_time = Column(DateTime)
        done = Column(Boolean, default=False)
    
    @app.route("/create-task/")
    def create_task():
        new_task = Task(name="some task", \
            execution_time=datetime(year=2025, month=6, day=18, hour=23))
        return "something"
    
    def task_checker():
        while True:
            sleep(10)
            tasks = Task.query.all()
            for task in tasks:
                formatted_exec_time = task.execution_time.strftime('%Y:%m:%d:%H:%M:%S')
                formatted_now_time = now.strftime('%Y:%m:%d:%H:%M:%S')
                if formatted_exec_time<formatted_now_time and not task.done:
                    # do the task
                    print(f'task {task.name} done')
                    task.done = True
            db.session.commit()
    
    
    background_thread = T(name='background', target=task_checker, daemon=True)
    if __name__ == '__main__':
        db.create_all()
        app.run(debug=True)
        background_thread.start()