I am having trouble using Playwright in my Prefect flow. The flow has to scrape ~800 urls and therefore splits them into chunks of 200 and runs those chunks in parallel. The urls within a chunk are handled one after another.
I learned that I have to use async_playwright
in my setup. I also learned that Playwright is not thread-safe so I have to create one Playwright instance per thread.
I think my error comes from the line return from_channel(await self._channel.send("newPage"))
in _browser_context.py
of Playwright with self._channel
being None
. I spend multiple days but couldn't fix it. Can you help me out?
Simplified code example:
from playwright.async_api import async_playwright
import asyncio
import re
from prefect import task, flow
from prefect.cache_policies import NONE
# Each thread uses its own Playwright instance as Playwright is not thread-safe
class SpiderThread:
def __init__(self, p, browser, context):
self.p = p
self.browser = browser
self.context = context
@task(cache_policy=NONE) # Cache policy needed to disable pickling
def run(self): # This will later run through hundreds of websites
page = asyncio.run(self.context.new_page())
response = asyncio.run(page.goto("https://www.ard.de/"))
extracted_title = re.search('<title>(.*)</title>', response.text()).group(1)
# Should print/return "ARD"
print(extracted_title)
return extracted_title
@classmethod
async def create(cls):
p = await async_playwright().start()
chromium = p.chromium
browser = await chromium.launch()
context = await browser.new_context()
return SpiderThread(p, browser, context)
# This is the main flow that runs multiple SpiderThreads in parallel
@flow(name="Main")
def main_flow():
threads= [asyncio.run(SpiderThread.create()), asyncio.run(SpiderThread.create())]
print(f"Threads: {threads}")
prefect_threads = []
for t in threads:
prefect_threads.append(t.run.submit())
# Wait for all of them to finish
print(f"Prefect threads: {prefect_threads}")
results = [prefect_thread.result() for prefect_thread in prefect_threads]
print(results)
if __name__ == "__main__":
main_flow()
This returns the following error multiple times:
12:20:29.653 | INFO | Flow run 'grumpy-meerkat' - Beginning flow run 'grumpy-meerkat' for flow 'Main'
12:20:29.659 | INFO | Flow run 'grumpy-meerkat' - View at http://127.0.0.1:4200/runs/flow-run/6e87d290-5ffa-43f2-9258-c6c9f6387ffb
Threads: [<__main__.SpiderThread object at 0x000002282AA90FD0>, <__main__.SpiderThread object at 0x000002282AA929B0>]
Prefect threads: [<prefect.futures.PrefectConcurrentFuture object at 0x000002282AE36D70>, <prefect.futures.PrefectConcurrentFuture object at 0x000002282AE37A90>]
12:20:30.795 | ERROR | Task run 'run-f28' - Task run failed with exception: AttributeError("BrowserContext.new_page: 'NoneType' object has no attribute 'send'") - Retries are exhausted
Traceback (most recent call last):
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 819, in run_context
yield self
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 1395, in run_task_sync
engine.call_task_fn(txn)
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 836, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\utilities\callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\mn\IdeaProjects\scraper\playwright-tests-prefect2.py", line 16, in run
page = asyncio.run(self.context.new_page())
File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
return future.result()
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\async_api\_generated.py", line 12787, in new_page
return mapping.from_impl(await self._impl_obj.new_page())
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_browser_context.py", line 325, in new_page
return from_channel(await self._channel.send("newPage"))
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 61, in send
return await self._connection.wrap_api_call(
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 528, in wrap_api_call
raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None
AttributeError: BrowserContext.new_page: 'NoneType' object has no attribute 'send'
12:20:30.796 | ERROR | Task run 'run-5e4' - Task run failed with exception: AttributeError("BrowserContext.new_page: 'NoneType' object has no attribute 'send'") - Retries are exhausted
Traceback (most recent call last):
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 819, in run_context
yield self
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 1395, in run_task_sync
engine.call_task_fn(txn)
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\task_engine.py", line 836, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\prefect\utilities\callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\mn\IdeaProjects\scraper\playwright-tests-prefect2.py", line 16, in run
page = asyncio.run(self.context.new_page())
File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\mn\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
return future.result()
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\async_api\_generated.py", line 12787, in new_page
return mapping.from_impl(await self._impl_obj.new_page())
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_browser_context.py", line 325, in new_page
return from_channel(await self._channel.send("newPage"))
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 61, in send
return await self._connection.wrap_api_call(
File "C:\Users\mn\IdeaProjects\scraper\venv\lib\site-packages\playwright\_impl\_connection.py", line 528, in wrap_api_call
raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None
AttributeError: BrowserContext.new_page: 'NoneType' object has no attribute 'send'
12:20:30.813 | ERROR | Task run 'run-5e4' - Finished in state Failed("Task run encountered an exception AttributeError: BrowserContext.new_page: 'NoneType' object has no attribute 'send'")
[... (multiple times)]
You are building the BrowserContext
in the main thread, then Prefect moves the run
task to another thread, resulting in Playwright seeing a foreign event-loop, its _channel
is None
, and new_page()
bombs.
asyncio.run()
inside a Prefect task spins up a new loop every call – another recipe for “object created in loop A, used in loop B”.
The below code should do. Playwright never leaves its thread and no use of asyncio.run()
:
from playwright.async_api import async_playwright
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
CHUNK = 200 # ≤200 URLs per task
@task # async task → stays in one loop/thread
async def scrape(urls):
titles = []
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context()
page = await context.new_page()
for u in urls:
await page.goto(u)
html = await page.content()
titles.append(html.split("<title>")[1].split("</title>")[0])
return titles
@flow(task_runner=ConcurrentTaskRunner()) # chunks run in parallel threads
def main(urls):
chunks = [urls[i:i+CHUNK] for i in range(0, len(urls), CHUNK)]
futures = scrape.map(chunks) # submit tasks
return [t for chunk in futures for t in chunk.result()]
if __name__ == "__main__":
print(main(["https://www.ard.de/"] * 800)[:3])