pythonsqlalchemymultiprocessingdatabase-connectionconnection-pooling

How does connection pooling work in python multiprocessing?


With python multiprocessing, Let's assume, I'm creating 10 processes by forking the main process. We know that child process inherit the state of the parent process in its own memory space.

I have a SQLAlchemy engine created in the parent process with pool size of 5. During the time of forking there aren't any connections in the pool.

Since the child processes have their own memory space that means the engine inherited should be only accessible to them, right?

  1. If that's the case, Does it mean the total number of active connections possible would be 10 [ number of processes ] * 5 [ pool_size ] = 50?
  2. If not, How does the connection pool work here?
    import multiprocessing
    import os
    
    # Create an engine with a connection pool in the parent process
    engine = create_engine('sqlite:///example.db', pool_size=5)
    
    def initializer():
        # Connections in the pool are gracefully removed!
        engine.dispose(close=False)
    
    def get_stuff():
      for _ in range(5):
         with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    with multiprocessing.Pool(10,initializer=initializer) as pool:
        results = [pool.apply_async(get_stuff) for _ in range(10)]
        # Collect results
        output = [result.get() for result in results]

Solution

  • Your question is a bit confusing because you weave 2 scenarios back and forth.

    Also the exact max number of connection is a little complex because there is also an overflow setting. The default pool has a max number of connections allowed set to pool_size + max_overflow. The rough idea is that you can prevent a bunch of connections being hung up ALL the time but still allow some burst of connections if there is demand. The specifics can be found here: sqlalchemy.pool.QueuePool.params.max_overflow

    Now to handle the cases:

    Note that if you are not using threading or asyncio in the processes then you will not exceed a single connection per process even if the engine would allow it. Every time you connect in the process you would just get the same connection back or the first time a new one would be made, assuming you are using the defaults.

    Finally, as I understand it, theoretically you would only use a large number of processes, 10, in a multiprocessing pool in Python with SQLAlchemy if you were pulling data in each process and then doing a large amount of CPU intensive / bound work. If most work is just waiting on the database you might want to just use threads.

    Example

    Here is an example that uses a single process to start up 2 child/sub processes. Each of those tries to overflow the pool. If you look at the debugging output, toggled on with echo_pool="debug", you can see that 30 (2 child process * (5 pool size + 10 max overflow)) connections are created immediately. Once the limit is hit though the excess threads (intentionally 5 more than the limit) have to wait up 30 seconds, pool_timeout=30, for the pool to give them a connection rather than creating more. The sleep is only 5 seconds so as the earlier threads finish the excess threads obtain connections and do not timeout.

    import os
    import math
    
    from sqlalchemy import (
        Column,
        Integer,
        create_engine,
    )
    from sqlalchemy.orm import (
        declarative_base,
    )
    from sqlalchemy.sql import select
    import threading
    import multiprocessing
    import concurrent
    import time
    
    POOL_SIZE = 5
    MAX_OVERFLOW = 10
    TOTAL_LIMIT = MAX_OVERFLOW + POOL_SIZE
    NUM_SUBPROCESSES = 2
    
    
    def get_engine(env):
        URI = f"sqlite:///example.db"
        return create_engine(
            URI,
            echo_pool='debug',
            pool_size=POOL_SIZE,
            max_overflow=MAX_OVERFLOW,
            # This is the default and should let workers still finish
            # but show the queuing.
            pool_timeout=30,
        )
    
    
    def is_prime(n):
        """ Simple is_prime taken straight from Python doc concurrent.futures example. """
        if n < 2:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
    
        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True
    
    
    engine = get_engine(os.environ)
    
    
    def initializer():
        # Connections in the pool are gracefully removed!
        engine.dispose(close=False)
    
    def stall(index):
        with engine.connect() as conn:
            print (f"Connected in process:thread={os.getpid()}:{threading.get_ident()}")
            time.sleep(5)
            # Just feed the index in and out of the database, doing nothing.
            db_result = conn.execute(select(index)).scalar()
            # Then check if prime.
            return is_prime(db_result)
    
    def fill_up_pool():
        results = {}
        # Start up threads in subprocess.
        with concurrent.futures.ThreadPoolExecutor(max_workers=TOTAL_LIMIT+5) as executor:
            future_to_index = {executor.submit(stall, x): x for x in range(TOTAL_LIMIT+5)}
            for future in concurrent.futures.as_completed(future_to_index):
    
                index = future_to_index[future]
                try:
                    result = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (index, exc))
                else:
                    print (f"{index} is {'prime' if result else 'composite'}")
                results[index] = result
        return results
    
    
    # Start up subprocesses.
    with multiprocessing.Pool(NUM_SUBPROCESSES,initializer=initializer) as pool:
        sub_results = [pool.apply_async(fill_up_pool, ()) for _ in range(NUM_SUBPROCESSES)]
        # Collect result and print.
        print ([result.get() for result in sub_results])
    
    

    Debugging output after filtering for Created new connection (exactly 30 as predicted):

    2024-06-26 17:36:36,193 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf5b0>
    2024-06-26 17:36:36,193 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf5b0>
    2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf880>
    2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801120>
    2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf880>
    2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801b70>
    2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801120>
    2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8023e0>
    2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801b70>
    2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8023e0>
    2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802890>
    2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802c50>
    2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802890>
    2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803010>
    2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802b60>
    2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8033d0>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803010>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803790>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803a60>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8033d0>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803790>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803f10>
    2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803b50>
    2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803f10>
    2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8310>
    2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a86d0>
    2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8310>
    2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8a90>
    2024-06-26 17:36:36,199 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a86d0>
    2024-06-26 17:36:36,199 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8a90>```