pythonmultiprocessingpython-multiprocessing

Python: Logging to a single file when having a ProcessPoolExecutor within Process workers, using QueueHandler


I want to add logs to a single file from different Processes, which contain a ProcessPoolExectuor.

My application has the following strcture:

Both the secondary process and the pool of processes are kept open all the time until the application closes.

The example below is an example of the architecture I have in my application. I am trying to setup a multiprocessing queue to handle all logs coming from the Process and the PoolProcessExecutor so the access to the file is safe.

I tried to use the example from the python documentation, and added the PoolProcesExecutor, but in the example below, the logs from the main process are not stored in the file. What am I missing?

import logging
import logging.handlers
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import sys
import traceback
from random import choice

# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]

NUMBER_OF_WORKERS = 2
NUMBER_OF_POOL_WORKERS = 1
NUMBER_OF_MESSAGES = 1

LOG_SIZE = 1024  # 10 KB
BACKUP_COUNT = 10
LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
LOG_NAME = 'mptest.log'


def configure_logging_format():
    """
    Configure the listener process logging.
    """
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
    f = logging.Formatter(LOG_FORMAT)
    h.setLevel(logging.DEBUG)
    h.setFormatter(f)
    root.addHandler(h)


def main_process_listener(queue: multiprocessing.Queue):
    """
    This is the listener process top-level loop: wait for logging events
    (LogRecords)on the queue and handle them, quit when you get a None for a
    LogRecord.

    Parameters
    ----------
    queue: Queue
        Queue to get the log records from.
    """
    print('Trigger main listener.')
    configure_logging_format()
    while True:
        try:
            record = queue.get()
            if record is None:  # sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            traceback.print_exc(file=sys.stderr)


def broadcast_logs_from_pool_to_main_listener(pool_process_queue, main_process_queue):
    """
    This is the listener process top-level loop: wait for logging events from the pool process
    and broadcast them to the main listener process.

    pool_process_queue: Queue
        Pool process queue to get the log records from.
    main_process_queue: Queue
        Main process queue to put the log records to.
    """
    print('Broadcasting logs from pool to main listener.')
    # configure_logging_format()
    while True:
        try:
            record = pool_process_queue.get()
            if record is None:  # sentinel to tell the listener to quit.
                break
            # TODO: apply level of filtering
            main_process_queue.put(record)
        except Exception:
            traceback.print_exc(file=sys.stderr)


def configure_logging_for_multiprocessing(queue):
    """
    The worker configuration is done at the start of the worker process run.
    Note that on Windows you can't rely on fork semantics, so each process
    will run the logging configuration code when it starts.
    """
    print('Configuring logging for multiprocessing...')
    h = logging.handlers.QueueHandler(queue)  # Handler needed to send records to queue
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)


def pool_process(queue):
    configure_logging_for_multiprocessing(queue)

    name = multiprocessing.current_process().name
    print('Pool process started: %s' % name)
    logging.getLogger(name).log(choice(LEVELS), 'message')
    print('Pool process finished: %s' % name)


def worker_process(queue):
    """
    Worker process that logs messages to the queue.

    Parameters
    ----------
    queue: Queue
        Queue to log the messages to.
    """
    configure_logging_for_multiprocessing(queue)

    pool_queue = multiprocessing.Manager().Queue(-1)
    lp = multiprocessing.Process(target=broadcast_logs_from_pool_to_main_listener, args=(pool_queue, queue))
    lp.start()

    # Create ProcessPoolExecutor
    executor = ProcessPoolExecutor(max_workers=NUMBER_OF_POOL_WORKERS)
    for i in range(NUMBER_OF_POOL_WORKERS):
        executor.submit(pool_process, pool_queue)

    # Send message
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    logging.getLogger(name).log(choice(LEVELS), 'message')
    print('Worker finished: %s' % name)

    # Shutdown the executor and the listener
    executor.shutdown()
    pool_queue.put_nowait(None)


if __name__ == '__main__':
    main_logging_queue = multiprocessing.Manager().Queue()

    # Start the listener process
    lp = multiprocessing.Process(target=main_process_listener, args=(main_logging_queue,))
    lp.start()

    logging.getLogger('main_1').log(choice(LEVELS), 'main process 1')

    # Start the worker processes
    workers = []
    for i in range(NUMBER_OF_WORKERS):
        worker = multiprocessing.Process(target=worker_process, args=(main_logging_queue,))
        workers.append(worker)
        worker.start()

    # Log a message from the main process
    logging.getLogger('main_2').log(choice(LEVELS), 'main process 1')

    # Wait for all of the workers to finish
    for w in workers:
        w.join()

    main_logging_queue.put_nowait(None)
    lp.join()

Solution

  • I took the opportunity to simplify your example, but here's what I think you're going for? (please read the code comments for more information) I see some unrelated things as possibly causing issues in the future (nested process creation is very hard to ensure proper cleanup in a fault scenario), and I would definitely recommend the tutorial on modern logging by mCoding on YT. It has lots of great information.

    import logging
    import logging.handlers
    import multiprocessing
    import threading
    from concurrent.futures import ProcessPoolExecutor
    
    LOG_SIZE = 2**16  # 64 KiB
    BACKUP_COUNT = 10
    LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
    LOG_NAME = 'mptest.log'
    
    def init_file_logger():
        root = logging.getLogger()
        h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
        f = logging.Formatter(LOG_FORMAT)
        h.setLevel(logging.DEBUG)
        h.setFormatter(f)
        root.addHandler(h)
        root.setLevel(logging.DEBUG)
    
    def init_q_logger(log_q):  # send log records to the log_q_reader thread
        root = logging.getLogger()
        h = logging.handlers.QueueHandler(log_q)
        root.addHandler(h)
        root.setLevel(logging.DEBUG)
    
    #use a thread to recieve logs so it can share main logging config
    def log_q_reader(log_q: multiprocessing.Queue):
        root = logging.getLogger()
        for record in iter(log_q.get, None):
            root.handle(record)
    
    def pool_task(*args):
        logging.debug(f"pool task({args})")  # logs to queuehandler as configured by init_q_logger() inside pool initializer
    
    def process_task(log_q, *args):
        init_q_logger(log_q)  # init logging queue handler for child process
        logging.debug(f"process task({args})")  # logs to queuehandler as configured by init_q_logger()
    
        #Use pool initializer function to init queue handler for grandchild processes
        #  Also grandchild process in general is in my opinion code smell. From the zen
        #  of python: "Flat is better than nested."
        with ProcessPoolExecutor(initializer=init_q_logger, initargs=(log_q,)) as pool:
            futures = []
            for x in [(1,2),(3,4),(5,6)]:
                futures.append(pool.submit(pool_task, x))
            for f in futures: #wait on all results before exiting context manager
                f.result()
    
    if __name__ == "__main__":
        init_file_logger()  # could inline this
        log_q = multiprocessing.Queue()  # Using a real queue rather than a managed proxy can only be passed as pool initargs, not as an argument for task submission. 
        log_thread = threading.Thread(target=log_q_reader, args=(log_q,))
        log_thread.start()
        logging.debug("before process1")  # logs directly to the rotatingfilehandler as was configured by init_file_logger()
        p = multiprocessing.Process(target=process_task, args=(log_q, "a", "b", "c"))
        p.start()
        p.join()
        logging.debug("before process2")
        p = multiprocessing.Process(target=process_task, args=(log_q, "x", "y", "z"))
        p.start()
        p.join()
        logging.debug("end")
        log_q.put(None)
        log_q.close()
        log_thread.join()