Trying to solve https://leetcode.com/problems/web-crawler-multithreaded/
This code works (well at least for the toy test cases before eventually TLE'ing)
from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
def __init__(self):
self.visited = set()
self.frontier = queue.Queue()
self.visitLock = Lock()
def threadCrawler(self, htmlParser):
while True:
nextUrl = self.frontier.get()
urls = htmlParser.getUrls(nextUrl)
with self.visitLock:
self.visited.add(nextUrl)
host = urlparse(nextUrl).hostname
urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
with self.visitLock:
urls = list(filter(lambda x: x not in self.visited, urls))
for url in urls:
self.frontier.put(url)
self.frontier.task_done()
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
self.frontier.put(startUrl)
n = 10
for i in range(n):
Thread(target=self.threadCrawler, args=(htmlParser,), daemon=True).start()
self.frontier.join()
return self.visited
But this code which uses a ThreadPoolExecutor doesn't work - it times out in the toy examples even with a single thread.
from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
def __init__(self):
self.visited = set()
self.frontier = queue.Queue()
self.visitLock = Lock()
def threadCrawler(self, htmlParser):
while True:
nextUrl = self.frontier.get()
urls = htmlParser.getUrls(nextUrl)
with self.visitLock:
self.visited.add(nextUrl)
host = urlparse(nextUrl).hostname
urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
with self.visitLock:
urls = list(filter(lambda x: x not in self.visited, urls))
for url in urls:
self.frontier.put(url)
self.frontier.task_done()
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
self.frontier.put(startUrl)
n = 1
executor = ThreadPoolExecutor(max_workers=n)
for i in range(n):
executor.submit(self.threadCrawler,htmlParser, daemon=True)
self.frontier.join()
return self.visited
Even when I remove the daemon parameter, store the futures and check their result, it still results in a TLE
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
self.frontier.put(startUrl)
n = 1
executor = ThreadPoolExecutor(max_workers=n)
futures = []
for i in range(n):
futures.append(executor.submit(self.threadCrawler,htmlParser))
self.frontier.join()
for i in range(n):
print(futures[i].result())
return self.visited
You are not using the concurrent.futures version of your code with the correct pattern: you launch a "run forever" server function which takes task-targets from a queue, just like some non-managed threading function should be.
Future-like functions, should have a limited lifetime, and just return their result, not a while True:\n target = queue.get()
pattern.
(That actually happens, but the queue, callable and parameters fetching and dispatching is code internal to the executor, so one doesn't have to botter).
Since your Future
is not designed to ever return, it is just natural that it times-out!
Now, moving on: once the worker function is the one responsible for finding new task-targets (the crawled URLs), maybe for this example, it is more natural to just use threading.Thread, and manage the Queue in your own code: because concurrent.Futures
is not designed in a way that one task can directly post other tasks.
In this model, a task should run once, get its results, return them back, and the main thread is the part that should create new tasks for the fetched URLs, if needed. It can be done with some refactoring -
Here is a refactored example, using a concurrent.futures pattern I tried to make the code a bit simpler by retrieving just one item from as_completed
at a time. (but I can't test it since I don't have your HtmlParser class, which also does the actual I/O or sample site to try it)
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class Solution:
def __init__(self):
self.visited = set()
def threadCrawler(self, htmlParser, next_url):
urls = htmlParser.getUrls(next_url)
host = urlparse(nextUrl).hostname
return set(filter(lambda x: urlparse(x).hostname == host, urls))
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
#self.frontier.put(startUrl)
n = 1 # or whatever number
visited = {startUrl}
with ThreadPoolExecutor(max_workers=n) as executor:
tasks = set()
next_urls = {startUrl}
while tasks or next_urls:
for url in next_urls:
# create as many new tasks as the new_urls fetched by the last completed task:
tasks.add(executor.submit(self.threadCrawler, htmlParser, url))
# fetch a single, completed task
completed = next(as_completed(tasks))
tasks.remove(completed)
urls = completed.result() # Do this inside a try/except if the task code may raise any exceptions
next_urls = urls - visited # get the new_urls fetched by the task
visited.update(urls) # update the found URLs
self.visited = visited
return self.visited
So, note that the mutable structure of interest (self.visited) is manipulated only in the main thread, and the message passing to the threads is managed by the executor - so the user code (our code) doesn't need any of that. As a result, the crawler method went from 11 down to 3 LoC in this example (and your original code was even using the locks somewhat incorrectly).