pythontimeruptime

How can I get an equivalent of Python threading.Timer that uses system uptime?


TL;DR threading.Timer uses system time but the time changes while I'm using it, how can I get it to use system uptime?

I have a Python script that does a bunch of stuff one of which is set the system time. When this script starts up the time is wrong. This script also needs to have a global timeout of 30 seconds.

I have been using the following timeout class:

class Timeout(object):
    def __init__(self, seconds=1, signum=signal.SIGUSR1, exception=TimeoutException):
        self.exception = exception
        self.pid = os.getpid()
        self.signum = signum
        self.timer = threading.Timer(seconds, self.exit_function)

    def exit_function(self):
        os.kill(self.pid, self.signum)

    def handle_timeout(self, signum, frame):
        raise self.exception()

    def __enter__(self):
        signal.signal(self.signum, self.handle_timeout)
        self.timer.start()

    def __exit__(self, type, value, traceback):
        self.timer.cancel()

Which wraps my entire script:

with Timeout(seconds=30):
    main()

occasionally the script fails really quickly or never gets killed after the 30 seconds. I believe this is because threading.Timer uses the system time which gets changed while the script is running. Is there anyway I can get it to use system uptime?


Solution

  • Update

    What I am doing now is to monkey patch threading._time with a monotonic function from the monotonic package on PyPI.

    import threading
    import monotonic
    
    threading._time = monotonic.monotonic
    

    Original answer

    I ended up extending threading.Timer to use system uptime.

    class Timer(threading._Timer):
    
        def __init__(self, *args, **kwargs):
            super(Timer, self).__init__(*args, **kwargs)
    
            # only works on Linux
            self._libc = ctypes.CDLL('libc.so.6')
            self._buf = ctypes.create_string_buffer(128)
    
        def uptime(self):
            self._libc.sysinfo(self._buf)
            return struct.unpack_from('@l', self._buf.raw)[0]
    
        def run(self):
            start_time = self.uptime()
            while not self.finished.is_set():
                time.sleep(0.1)
                if self.uptime() - start_time > self.interval:
                    self.function(*self.args, **self.kwargs)
                    break
            self.finished.set()