pythonpython-3.xmultithreadingpython-multithreadingpython-contextvars

Intermittently losing ContextVar when passing from parent to child thread


I have a subclass of Thread that I use across my project. In this class, I pass in the ContextVar manually. However, at times (once or twice a day), I notice that the ContextVar in the child thread is not set (reverted to a default value).

class MyThread(Thread):
    def __init__(
        self,
        group: None = None,
        target: Callable[..., Any] | None = None,
        name: str | None = None,
        args: tuple[Any, ...] = (),
        kwargs: dict[str, Any] | None = None,
        *,
        daemon: bool | None = None,
    ):
        super().__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)
        self.my_resource = get_resource_info()

    def run(self):
        self._exception = None
        try:
            set_my_resource_info(self.my_resource.name, self.my_resource.kind)
            self._return_value = super().run()
        except BaseException as e:
            self._exception = e

    def join(self, timeout: float | None = None):
        super().join(timeout)
        if self._exception:
            raise self._exception
        return self._return_value

And in another module I have:

@dataclass
class MyResourceInfo:
    name: str
    kind: str ="unknown"


resource_info: ContextVar[MyResourceInfo] = ContextVar(
    'my_resource_info',
    default=MyResourceInfo(name=get_default_resource_name()),
)

def set_resource_info(name: str, kind: str = 'unknown') -> Token[MyResourceInfo]:
    return resource_info.set(MyResourceInfo(name=name, kind=kind))

Why does the context var revert to default value intermittently in child threads?


Solution

  • I could not reproduce the problem.

    Sorry - I already discussed what you can possibly do in the comments - but as this could reveal a serious bug in Python contextvar implementation, I really tried to reproduce the problem.

    I came up with the following script, and run it with different Python versions (3.10.7, 3.13.0,, 3.13.0t) on a fedora Linux, and also Python 3.10.15 in docker (python:3.10-slim-bookworm as indicated in the context), spinning up from 1000 to 1_000_000 threads as fast as possible, and with delays added in several different points and combinations, and not a single time the ContextVar failed to be set.

    (I also tried, as visible in the commented-out lines, to do it in the regular target function, using the default threading.Thread class)

    contextkabum.py :

    import threading
    import contextvars
    import time
    import sys
    
    var = contextvars.ContextVar("var", default=0)
    errors = []
    delay = sys.getswitchinterval() * 3
    
    
    class T(threading.Thread):
        def run(self, *args):
            #time.sleep(delay)
            var.set(42)
            #time.sleep(delay)
            #time.sleep(0.001)
            return super().run(*args)
    
    
    def target():
        #var.set(42)
        #time.sleep(0.001)
        if (x:=var.get()) != 42:
            errors.append(y:=(threading.current_thread(), time.time(), x))
            print(y)
        time.sleep(0.001)
    
    def doit(n=1_000_000):
        threads = []
        for i in range(n):
            threads.append(T(target=target))
        for i, t in enumerate(threads):
            t.start()
            if not i % 30:
                pass
                #time.sleep(.01)
            if not i % 100:
                print(i)
        for t in threads:
            t.join()
        print (errors)
    
    
    doit()
    

    Dockerfile:

    from python:3.10-slim-bookworm
    
    copy contextkabum.py /root 
    
    cmd python /root/contextkabum.py
    
    

    workaround:

    As stated in the comments for effects of the your working environment, just add an explicit check, which would be redundant in the total absence of the problem, and re-set the ContextVar in question:

    
        def run(self):
            self._exception = None
            try:
                set_my_resource_info(self.my_resource.name, self.my_resource.kind)
                if check_resource_is_default():
                      time.sleep(0.005)  # you can use "sys.getswitchinterval()" to not hardcode the 0.005 here
                      set_my_resource_info(self.my_resource.name, self.my_resource.kind)
    
                self._return_value = super().run()
            except BaseException as e:
                self._exception = e
    
    def check_resource_is_default():
        ...