pythonmultithreadingqueuethreadpoolexecutor

Why does my code time out with ThreadPoolExecutor but not with normal Threads


Trying to solve https://leetcode.com/problems/web-crawler-multithreaded/

This code works (well at least for the toy test cases before eventually TLE'ing)

from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
    def __init__(self):
        self.visited = set()
        self.frontier = queue.Queue()
        self.visitLock = Lock()
        
    def threadCrawler(self, htmlParser):
        while True:
            nextUrl = self.frontier.get()
            urls = htmlParser.getUrls(nextUrl)
            with self.visitLock:
                self.visited.add(nextUrl)
            host = urlparse(nextUrl).hostname
            urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
            with self.visitLock:
                urls = list(filter(lambda x: x not in self.visited, urls))
            for url in urls:
                self.frontier.put(url)
            self.frontier.task_done()

    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        self.frontier.put(startUrl)
        n = 10
        for i in range(n):
            Thread(target=self.threadCrawler, args=(htmlParser,), daemon=True).start()
        self.frontier.join()
        return self.visited

But this code which uses a ThreadPoolExecutor doesn't work - it times out in the toy examples even with a single thread.

from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
    def __init__(self):
        self.visited = set()
        self.frontier = queue.Queue()
        self.visitLock = Lock()
        
    def threadCrawler(self, htmlParser):
        while True:
            nextUrl = self.frontier.get()
            urls = htmlParser.getUrls(nextUrl)
            with self.visitLock:
                self.visited.add(nextUrl)
            host = urlparse(nextUrl).hostname
            urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
            with self.visitLock:
                urls = list(filter(lambda x: x not in self.visited, urls))
            for url in urls:
                self.frontier.put(url)
            self.frontier.task_done()

    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        self.frontier.put(startUrl)
        n = 1
        executor = ThreadPoolExecutor(max_workers=n)
        for i in range(n):
            executor.submit(self.threadCrawler,htmlParser, daemon=True)
        self.frontier.join()
        return self.visited

Even when I remove the daemon parameter, store the futures and check their result, it still results in a TLE

 def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        self.frontier.put(startUrl)
        n = 1
        executor = ThreadPoolExecutor(max_workers=n)
        futures = []
        for i in range(n):
            futures.append(executor.submit(self.threadCrawler,htmlParser))
        self.frontier.join()
        for i in range(n):
            print(futures[i].result())
        return self.visited

Solution

  • You are not using the concurrent.futures version of your code with the correct pattern: you launch a "run forever" server function which takes task-targets from a queue, just like some non-managed threading function should be.

    Future-like functions, should have a limited lifetime, and just return their result, not a while True:\n target = queue.get() pattern. (That actually happens, but the queue, callable and parameters fetching and dispatching is code internal to the executor, so one doesn't have to botter).

    Since your Future is not designed to ever return, it is just natural that it times-out!

    Now, moving on: once the worker function is the one responsible for finding new task-targets (the crawled URLs), maybe for this example, it is more natural to just use threading.Thread, and manage the Queue in your own code: because concurrent.Futures is not designed in a way that one task can directly post other tasks.

    In this model, a task should run once, get its results, return them back, and the main thread is the part that should create new tasks for the fetched URLs, if needed. It can be done with some refactoring -

    Here is a refactored example, using a concurrent.futures pattern I tried to make the code a bit simpler by retrieving just one item from as_completed at a time. (but I can't test it since I don't have your HtmlParser class, which also does the actual I/O or sample site to try it)

    
    from urllib.parse import urljoin, urlparse
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    
    class Solution:
        def __init__(self):
            self.visited = set()
            
        def threadCrawler(self, htmlParser, next_url):
            urls = htmlParser.getUrls(next_url)
            host = urlparse(nextUrl).hostname
            return set(filter(lambda x: urlparse(x).hostname == host, urls))
    
        def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
            #self.frontier.put(startUrl)
            n = 1 # or whatever number
            
            visited = {startUrl}
            with ThreadPoolExecutor(max_workers=n) as executor:
                tasks = set()
                next_urls = {startUrl}
                while tasks or next_urls:
                    for url in next_urls:
                        # create as many new tasks as the new_urls fetched by the last completed task:
                        tasks.add(executor.submit(self.threadCrawler, htmlParser, url))
                    
                    # fetch a single, completed task
                    completed = next(as_completed(tasks))
                    tasks.remove(completed)
                    urls = completed.result()  # Do this inside a try/except if the task code may raise any  exceptions
                    next_urls = urls - visited # get the new_urls fetched by the task
                    visited.update(urls)  # update the found URLs 
                    
            self.visited = visited 
            return self.visited
                
    

    So, note that the mutable structure of interest (self.visited) is manipulated only in the main thread, and the message passing to the threads is managed by the executor - so the user code (our code) doesn't need any of that. As a result, the crawler method went from 11 down to 3 LoC in this example (and your original code was even using the locks somewhat incorrectly).