pythonpython-asyncioaiohttpapscheduler

How to use aiohttp with apscheduler?


I would like to fetch several web pages periodically all within the same aiohttp.ClientSession(). Here is what I have got so far. The URLs need to remain within the jobs, because some other URLs will need to be calculated.

What command is missing in the place of ???. Or do I need to do this in a completely different way? Thanks in advance for your help.

P.S.: The seconds interval is for testing purposes only. Later, I will change it to a one minute interval.

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
import aiohttp

async def fetch(session, url, timeout=3):
    async with session.get(url, ssl=False, timeout=timeout) as response:
        return await response.text(), response.status

async def GOESX_job(session):
    url = 'https://services.swpc.noaa.gov/json/goes/primary/xrays-6-hour.json'
    response, status = await fetch(session, url)
    print('GOESX', status)

async def GOESp_job(session):
    url = 'https://services.swpc.noaa.gov/json/goes/primary/integral-protons-6-hour.json'
    response, status = await fetch(session, url)
    print('GOESp', status)

async def jobs(scheduler):
    async with aiohttp.ClientSession() as session:
        scheduler.add_job(GOESX_job, 'interval', seconds=5, args=[session])
        scheduler.add_job(GOESp_job, 'interval', seconds=10, args=[session])

scheduler = AsyncIOScheduler()
??? jobs(scheduler)

scheduler.start()
asyncio.get_event_loop().run_forever()

Solution

  • This example code which can be found on page 119 of the highly recommendable booklet Python Asyncio Jump-Start by Jason Brownlee inspired me to come up with my own answer.

    My answer also evades the current bug in apscheduler of not being able to start the interval immediately.

    I also added a bit of code to gracefully exit the loop by closing the aiohttp.ClientSession().

    #! /usr/bin/env python3
    
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    import asyncio
    import aiohttp
    
    async def fetch(session, url, timeout=3):
        async with session.get(url, ssl=False, timeout=timeout) as response:
            return await response.text(), response.status
    
    async def GOESX_task(event, session):
        while True:
            print('GOESX')
            url = 'https://services.swpc.noaa.gov/json/goes/primary/xrays-6-hour.json'
            response, status = await fetch(session, url)
            print('GOESX', status)
            await event.wait()
            event.clear()
    
    async def GOESp_task(event, session):
        while True:
            print('GOESp')
            url = 'https://services.swpc.noaa.gov/json/goes/primary/integral-protons-6-hour.json'
            response, status = await fetch(session, url)
            print('GOESp',status)
            await event.wait()
            event.clear()
    
    async def main():
        GOESX_trigger = asyncio.Event()
        GOESp_trigger = asyncio.Event()
    
        session = aiohttp.ClientSession()
    
        try:
            tasks = [asyncio.create_task(GOESX_task(GOESX_trigger, session)), \
                     asyncio.create_task(GOESp_task(GOESp_trigger, session))]
    
            await asyncio.sleep(0)    # Suspends main, allowing tasks to start.
    
            scheduler = AsyncIOScheduler()
            scheduler.add_job(GOESX_trigger.set, 'interval', seconds=5)
            scheduler.add_job(GOESp_trigger.set, 'interval', seconds=10)
            scheduler.start()
    
            await asyncio.wait(tasks)
    
        finally:
            await session.close()
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            pass