pythonpython-3.xmultiprocessingmysql-connector

How do I initiliaze a pool of worker processes with database connections in python


I am trying to set up a pool of worker processes where each one has a set of database connection pools

this is class definition


class MultiprocessingManager:
    _instance = None  # Class variable to hold the singleton instance

    def __new__(cls, local_ip, linked_ip, server_ip):
        """
            Ensures that MultiprocessingManager is only created once
        """
        if not hasattr(cls, '_instance') or cls._instance is None:
            cls._instance = super(MultiprocessingManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, local_ip, linked_ip, server_ip):
        if not self._initialized:
            print("Initializing MultiprocessingManager instance...")
            self.pool = Pool(
                processes=multiprocessing.cpu_count(),
                initializer=self._init_worker(local_ip=local_ip, linked_ip=linked_ip, server_ip=server_ip)
            )
            self._initialized = True
            print("Initializing MultiprocessingManager instance...")

And this is the _init_worker function to try and set up the connections

    def _init_worker(self, local_ip, linked_ip, server_ip):
        """
            Used to initialize each PID worker with dedicated DBManagers
        """
        local_connection = DatabaseManager(
            host_ip=local_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=local_ip,
            db_port=3306,
            db_pool_name='local_db_pool',
            db_pool_size=5
        )
        linked_connection = DatabaseManager(
            host_ip=linked_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=linked_ip,
            db_port=3306,
            db_pool_name='linked_db_pool',
            db_pool_size=5
        )
        server_connection = DatabaseManager(
            host_ip=server_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=server_ip,
            db_port=3306,
            db_pool_name='server_db_pool',
            db_pool_size=5
        )

Where from the DatabaseManager class this was how I was setting up connections to MySQL

            print('DatabaseManager instantiation...')
            self.host_ip = host_ip
            self.host_username = host_username
            self.db_password = db_password
            self.db_name = db_name
            self.db_ip = db_ip
            self.db_port = db_port
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.socket.connect((self.db_ip, self.db_port))

            self.db_connection = mysql.connector.pooling.MySQLConnectionPool(
                pool_name=db_pool_name,
                pool_size=db_pool_size,
                pool_reset_session=True,
                host=self.host_ip,
                user=self.host_username,
                password=self.db_password,
                database=self.db_name)

In regards to multiprocessing, what was being done is worker functions were created in seperate files, containing their own db connections

def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
        server_connection = DatabaseManager(
            host_ip=server_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=server_ip,
            db_port=3306,
            db_pool_name='server_db_pool',
            db_pool_size=5
        )
    server_connection.write_master_program_log_table(status_code, parameters, log_datetime)

And the functions are used in multiprocessing context like this

    def log_status_code_to_server(self, status_code: StatusCodes, params: Optional[dict], timestamp: datetime):
        print("logging...")
        self.pool.apply_async(log_program_log_to_server, args=(status_code, params, timestamp))
        print("logged!")

But this isn't ideal since connections would be remade every time this function is called, so instead what needs to happen is each process with the multiprocessing pool should be initiliazed containing their own db connections already.

So my question is, if I do worker initialization using the _init_worker function from above, how exactly do I make use of those connection pools?

Any help appreciated!


Solution

  • What I would do is create a pool initializer that defines your connection pools as global variables, for example:

    def init_pool_processes(local_ip, linked_ip, server_ip):
        global local_connection, linked_connection, server_connection
    
        local_connection = DatabaseManager(
            ...
            db_ip=local_ip,
            ...
        )
    
        linked_connection = DatabaseManager(
            ...
            db_ip=linked_ip,
            ...
        )
    
        server_connection = DatabaseManager(
            ...
            db_ip=server_ip,
            ...
        )
    

    Then your MultiprocessingManager.__init__ method would look like:

                ...
                self.pool = Pool(
                    processes=multiprocessing.cpu_count(),
                    initializer=init_pool_processes(
                        local_ip=local_ip,
                        linked_ip=linked_ip,
                        server_ip=server_ip
                        )
                )
                ...
    

    Finally, your worker function becomes:

    def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
        # server_connection is a global initialized by our pool initializer:
        server_connection.write_master_program_log_table(status_code, parameters, log_datetime)
    

    But the question I have for you is this:

    Why would each process in the multiprocessing pool require a pool of connections since log_program_log_to_server is appears to be using only a single connection? So the pool initializer function init_pool_processes should be creating simple connections instead of pools of connections. If you must use a connection pool, make the size of the pool 1.