pythonpython-3.xplaywrightplaywright-pythonprefect

Use anyc_playwright within multithreaded Prefect flow (`_channel` seems to be `None`)


I am having trouble using Playwright in my Prefect flow. The flow has to scrape ~800 urls and therefore splits them into chunks of 200 and runs those chunks in parallel. The urls within a chunk are handled one after another.

I learned that I have to use async_playwright in my setup. I also learned that Playwright is not thread-safe so I have to create one Playwright instance per thread.

I think my error comes from the line return from_channel(await self._channel.send("newPage")) in _browser_context.py of Playwright with self._channel being None. I spend multiple days but couldn't fix it. Can you help me out?

Simplified code example:

from playwright.async_api import async_playwright
import asyncio
import re
from prefect import task, flow
from prefect.cache_policies import NONE

# Each thread uses its own Playwright instance as Playwright is not thread-safe
class SpiderThread:
    def __init__(self, p, browser, context):
        self.p = p
        self.browser = browser
        self.context = context

    @task(cache_policy=NONE) # Cache policy needed to disable pickling
    def run(self): # This will later run through hundreds of websites
        page = asyncio.run(self.context.new_page())
        response = asyncio.run(page.goto("https://www.ard.de/"))
        extracted_title = re.search('<title>(.*)</title>', response.text()).group(1)
        # Should print/return "ARD"
        print(extracted_title)
        return extracted_title

    @classmethod
    async def create(cls):
        p = await async_playwright().start()
        chromium = p.chromium
        browser = await chromium.launch()
        context = await browser.new_context()
        return SpiderThread(p, browser, context)

# This is the main flow that runs multiple SpiderThreads in parallel
@flow(name="Main")
def main_flow():
    threads= [asyncio.run(SpiderThread.create()), asyncio.run(SpiderThread.create())]
    print(f"Threads: {threads}")

    prefect_threads = []
    for t in threads:
        prefect_threads.append(t.run.submit())
    # Wait for all of them to finish
    print(f"Prefect threads: {prefect_threads}")
    results = [prefect_thread.result() for prefect_thread in prefect_threads]
    print(results)

if __name__ == "__main__":
    main_flow()

This returns the following error multiple times:

12:20:29.653 | INFO    | Flow run 'grumpy-meerkat' - Beginning flow run 'grumpy-meerkat' for flow 'Main'
12:20:29.659 | INFO    | Flow run 'grumpy-meerkat' - View at http://127.0.0.1:4200/runs/flow-run/6e87d290-5ffa-43f2-9258-c6c9f6387ffb
Threads: [<__main__.SpiderThread object at 0x000002282AA90FD0>, <__main__.SpiderThread object at 0x000002282AA929B0>]
Prefect threads: [<prefect.futures.PrefectConcurrentFuture object at 0x000002282AE36D70>, <prefect.futures.PrefectConcurrentFuture object at 0x000002282AE37A90>]
12:20:30.795 | ERROR   | Task run 'run-f28' - Task run failed with exception: AttributeError("BrowserContext.new_page: 'NoneType' object has no attribute 'send'") - Retries are exhausted
Traceback (most recent call last):
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 819, in run_context
    yield self
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 1395, in run_task_sync
    engine.call_task_fn(txn)
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 836, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\utilities\callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\mn\IdeaProjects\scraper\playwright-tests-prefect2.py", line 16, in run
    page = asyncio.run(self.context.new_page())
  File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\async_api\_generated.py", line 12787, in new_page
    return mapping.from_impl(await self._impl_obj.new_page())
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_browser_context.py", line 325, in new_page
    return from_channel(await self._channel.send("newPage"))
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 61, in send
    return await self._connection.wrap_api_call(
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 528, in wrap_api_call
    raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None
AttributeError: BrowserContext.new_page: 'NoneType' object has no attribute 'send'
12:20:30.796 | ERROR   | Task run 'run-5e4' - Task run failed with exception: AttributeError("BrowserContext.new_page: 'NoneType' object has no attribute 'send'") - Retries are exhausted
Traceback (most recent call last):
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 819, in run_context
    yield self
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 1395, in run_task_sync
    engine.call_task_fn(txn)
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 836, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\utilities\callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\mn\IdeaProjects\scraper\playwright-tests-prefect2.py", line 16, in run
    page = asyncio.run(self.context.new_page())
  File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\async_api\_generated.py", line 12787, in new_page
    return mapping.from_impl(await self._impl_obj.new_page())
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_browser_context.py", line 325, in new_page
    return from_channel(await self._channel.send("newPage"))
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 61, in send
    return await self._connection.wrap_api_call(
  File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 528, in wrap_api_call
    raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None
AttributeError: BrowserContext.new_page: 'NoneType' object has no attribute 'send'
12:20:30.813 | ERROR   | Task run 'run-5e4' - Finished in state Failed("Task run encountered an exception AttributeError: BrowserContext.new_page: 'NoneType' object has no attribute 'send'")
[... (multiple times)]

Solution

  • You are building the BrowserContext in the main thread, then Prefect moves the run task to another thread, resulting in Playwright seeing a foreign event-loop, its _channel is None, and new_page() bombs.
    asyncio.run() inside a Prefect task spins up a new loop every call – another recipe for “object created in loop A, used in loop B”.

    The below code should do. Playwright never leaves its thread and no use of asyncio.run():

    from playwright.async_api import async_playwright
    from prefect import flow, task
    from prefect.task_runners import ConcurrentTaskRunner
    
    CHUNK = 200                      # ≤200 URLs per task
    
    @task                            # async task → stays in one loop/thread
    async def scrape(urls):
        titles = []
        async with async_playwright() as p:
            browser = await p.chromium.launch()
            context = await browser.new_context()
            page    = await context.new_page()
            for u in urls:
                await page.goto(u)
                html = await page.content()
                titles.append(html.split("<title>")[1].split("</title>")[0])
        return titles
    
    @flow(task_runner=ConcurrentTaskRunner())   # chunks run in parallel threads
    def main(urls):
        chunks = [urls[i:i+CHUNK] for i in range(0, len(urls), CHUNK)]
        futures = scrape.map(chunks)            # submit tasks
        return [t for chunk in futures for t in chunk.result()]
    
    if __name__ == "__main__":
        print(main(["https://www.ard.de/"] * 800)[:3])