I am trying to set up a pool of worker processes where each one has a set of database connection pools
this is class definition
class MultiprocessingManager:
_instance = None # Class variable to hold the singleton instance
def __new__(cls, local_ip, linked_ip, server_ip):
"""
Ensures that MultiprocessingManager is only created once
"""
if not hasattr(cls, '_instance') or cls._instance is None:
cls._instance = super(MultiprocessingManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, local_ip, linked_ip, server_ip):
if not self._initialized:
print("Initializing MultiprocessingManager instance...")
self.pool = Pool(
processes=multiprocessing.cpu_count(),
initializer=self._init_worker(local_ip=local_ip, linked_ip=linked_ip, server_ip=server_ip)
)
self._initialized = True
print("Initializing MultiprocessingManager instance...")
And this is the _init_worker
function to try and set up the connections
def _init_worker(self, local_ip, linked_ip, server_ip):
"""
Used to initialize each PID worker with dedicated DBManagers
"""
local_connection = DatabaseManager(
host_ip=local_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=local_ip,
db_port=3306,
db_pool_name='local_db_pool',
db_pool_size=5
)
linked_connection = DatabaseManager(
host_ip=linked_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=linked_ip,
db_port=3306,
db_pool_name='linked_db_pool',
db_pool_size=5
)
server_connection = DatabaseManager(
host_ip=server_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=server_ip,
db_port=3306,
db_pool_name='server_db_pool',
db_pool_size=5
)
Where from the DatabaseManager class this was how I was setting up connections to MySQL
print('DatabaseManager instantiation...')
self.host_ip = host_ip
self.host_username = host_username
self.db_password = db_password
self.db_name = db_name
self.db_ip = db_ip
self.db_port = db_port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.connect((self.db_ip, self.db_port))
self.db_connection = mysql.connector.pooling.MySQLConnectionPool(
pool_name=db_pool_name,
pool_size=db_pool_size,
pool_reset_session=True,
host=self.host_ip,
user=self.host_username,
password=self.db_password,
database=self.db_name)
In regards to multiprocessing, what was being done is worker functions were created in seperate files, containing their own db connections
def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
server_connection = DatabaseManager(
host_ip=server_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=server_ip,
db_port=3306,
db_pool_name='server_db_pool',
db_pool_size=5
)
server_connection.write_master_program_log_table(status_code, parameters, log_datetime)
And the functions are used in multiprocessing context like this
def log_status_code_to_server(self, status_code: StatusCodes, params: Optional[dict], timestamp: datetime):
print("logging...")
self.pool.apply_async(log_program_log_to_server, args=(status_code, params, timestamp))
print("logged!")
But this isn't ideal since connections would be remade every time this function is called, so instead what needs to happen is each process with the multiprocessing pool should be initiliazed containing their own db connections already.
So my question is, if I do worker initialization using the _init_worker
function from above, how exactly do I make use of those connection pools?
Any help appreciated!
What I would do is create a pool initializer that defines your connection pools as global variables, for example:
def init_pool_processes(local_ip, linked_ip, server_ip):
global local_connection, linked_connection, server_connection
local_connection = DatabaseManager(
...
db_ip=local_ip,
...
)
linked_connection = DatabaseManager(
...
db_ip=linked_ip,
...
)
server_connection = DatabaseManager(
...
db_ip=server_ip,
...
)
Then your MultiprocessingManager.__init__
method would look like:
...
self.pool = Pool(
processes=multiprocessing.cpu_count(),
initializer=init_pool_processes(
local_ip=local_ip,
linked_ip=linked_ip,
server_ip=server_ip
)
)
...
Finally, your worker function becomes:
def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
# server_connection is a global initialized by our pool initializer:
server_connection.write_master_program_log_table(status_code, parameters, log_datetime)
Why would each process in the multiprocessing pool require a pool of connections since log_program_log_to_server
is appears to be using only a single connection? So the pool initializer function init_pool_processes
should be creating simple connections instead of pools of connections. If you must use a connection pool, make the size of the pool 1.