pythonasynchronouspython-asynciopython-triomastodon-py

Asynchronous Streaming of Notifications with Mastodon.py


I am attempting to asynchronously stream notifications from Mastodon using Mastodon.py. To run the asynchronous code, I'm attempting to use Trio

I've tried the following:

...
from mastodon import Mastodon, StreamListener
...

def main():
    ...
    access_token = os.getenv("MASTODON_ACCESS_TOKEN")
    api_base_url = os.getenv("MASTODON_API_BASE_URL")

    # Login
    mastodon = Mastodon(
        access_token=access_token,
        api_base_url=api_base_url
    )
    # Show username
    logger.info(f"Logged in as {mastodon.account_verify_credentials()['username']}")
    ...

    logger.info("Starting user stream")
    user = mastodon.stream_user(TheStreamListener())
    logger.info("Started user stream")
    try:
        trio.run(user)
    except KeyboardInterrupt:
        logger.info("Stopping user stream")
        user.close()
        logger.info("Stopped user stream")

class TheStreamListener(StreamListener):
    def on_update(self, status):
        logger.info(f"Got update: {status['content']}")
        ...
    def on_notification(self, notification):
        if notification['type'] == 'mention':
            logger.opt(colors=True).info(f"Got <blue>mention</blue> from {notification['account']['username']}") # noqa E501
        ...
main()

I expected this to log Started user stream after running trio.run(). However, it seems the stream is not asynchronous as the last message logged is Starting user stream. I'm aware there is a run_async parameter for mastodon.stream_user() but have been unable to get it to run asynchronously.

I've tried mastodon.stream_user(TheStreamListener(), run_async=True, reconnect_async=True). However, I'm unsure how I can keep the stream running whilst running other code as the stream exits immediately.


Solution

  • I ended up implementing the following code successfully:

    def main():
        logger.info("Starting user stream")
        stream = mastodon.stream_user(TheStreamListener(), 
        run_async=True)logger.info("Started user stream")
        trio.run(sleep_or_not, stream)
    async def sleep_or_not(stream):
        # Experimenting with running other code while the stream is running
        try:
            async with trio.open_nursery() as nursery:
                nursery.start_soon(trio.sleep_forever)
                nursery.start_soon(print_time)
        except KeyboardInterrupt:
            ...