pythonscikit-learnscipymultiprocessingscipy-optimize-minimize

multiprocessing stuck using scipy.optimize and scikitlearn.dbscan in ubuntu


I am performing an optimization in Python using the 'SLSQP' method of the scipy optimization library. To improve the speed of Jacobian calculations, multiprocessing was applied to the cost function. Inside this cost function, scikit-learn's DBSCAN clustering algorithm is included.

The problem is that while performing multiprocessing, when I try to run DBSCAN, the process hangs. This phenomenon only occurs when the cost function is parallelized with multiprocessing and does not occur when using ThreadPool.

Specifically, this issue only occurs in Ubuntu environments; it works normally in Windows environments. I would like some advice on how these environment differences affect the problem and why DBSCAN execution hangs during multiprocessing.

import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool

def cost_function(x):
    print(f"Running cost function for x: {x}")
    clustering = DBSCAN().fit(np.array([x]))
    cost = np.sum(x**2)
    print(f"Cost for x: {x} is {cost}")
    return cost

def gradient_function(x):
    print(f"Calculating gradient for x: {x}")
    with Pool() as pool:
        gradients = pool.map(cost_function, [x])
    print(f"Gradient for x: {x} is {gradients}")
    return np.array(gradients)

x0 = np.array([0.5, 0.5])
print("Starting optimization...")
result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)

print("Optimization result:", result)

here is simple trouble shooting code. It also stuck too. and console

Starting optimization...
Running cost function for x: [0.5 0.5]
Cost for x: [0.5 0.5] is 0.5
Calculating gradient for x: [0.5 0.5]
Running cost function for x: [0.5 0.5]

Presumably, optimize(scipy) performs parallel processing using blas for fast computation. dbscan (scikitlearn) said that parallel processing is performed using OpenMp. Could creating a process within parallel processing and performing parallel processing using OpenMp within it be the cause of the problem?


Solution

  • This program has three things which cannot be combined in a single Python program.

    1. Multiprocessing.
    2. Multithreading inside the parent process.
    3. The fork start method.

    You need to remove one of the three.

    To explain this, let's start with the fork start method. In multiprocessing, it needs a way to start new processes. The default way to do this on Linux is fork. The fork() syscall makes an exact copy of a process in memory, and then multiprocessing uses one of those processes to do the work, and the other, the parent, creates more processes.

    An important detail of this is the way that fork() interacts with multithreading. Fork copies the locks being held by threads, but it does not copy the threads themselves. Therefore, if fork() happens to run while another thread in the parent process is holding a lock, the child's lock can end up permanently held.

    Why doesn't this happen on Windows? Windows doesn't support fork(). For that reason, multiprocessing uses a different start method, called spawn. Rather than copying the parent process, spawn starts with a fresh process, then loads all of your code. While this isn't ideal from a performance perspective, it does mean that locks held by the parent process can't affect the child.

    So this gives us Possible Fix #1: Change the start method to spawn by using multiprocessing.set_start_method('spawn').

    import numpy as np
    from scipy.optimize import minimize
    from sklearn.cluster import DBSCAN
    import multiprocessing
    from multiprocessing import Pool
    
    def cost_function(x):
        print(f"Running cost function for x: {x}")
        clustering = DBSCAN().fit(np.array([x]))
        cost = np.sum(x**2)
        print(f"Cost for x: {x} is {cost}")
        return cost
    
    
    def gradient_function(x):
        print(f"Calculating gradient for x: {x}")
        with Pool() as pool:
            gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
        print(f"Gradient for x: {x} is {gradients}")
        return np.array(gradients)
    
    if __name__ == "__main__":
        multiprocessing.set_start_method('spawn')
        x0 = np.array([0.5, 0.5])
        print("Starting optimization...")
        result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)
    
        print("Optimization result:", result)
    

    Note: If you use this approach, you'll need to avoid having any code outside of a function or if __name__ == "__main__":, or it will get run when spawn creates a child process.

    Another approach you could take would be to remove the multithreading. This gives us Possible Fix #2: set the OMP_NUM_THREADS environment variable to 1 to disables multithreading inside DBSCAN.

    import os
    os.environ['OMP_NUM_THREADS'] = '1'
    import numpy as np
    from scipy.optimize import minimize
    from sklearn.cluster import DBSCAN
    from multiprocessing import Pool
    
    
    def cost_function(x):
        print(f"Running cost function for x: {x}")
        clustering = DBSCAN().fit(np.array([x]))
        cost = np.sum(x**2)
        print(f"Cost for x: {x} is {cost}")
        return cost
    
    
    def gradient_function(x):
        print(f"Calculating gradient for x: {x}")
        with Pool() as pool:
            gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
        print(f"Gradient for x: {x} is {gradients}")
        return np.array(gradients)
    
    if __name__ == "__main__":
        x0 = np.array([0.5, 0.5])
        print("Starting optimization...")
        result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)
    
        print("Optimization result:", result)
    

    Warning: the order in which you set the environment variable and import DBSCAN matters. If DBSCAN is imported first, it will ignore OMP_NUM_THREADS. Another, cleaner way to do this is to use the threadpool library. I have a blog post which explains how you would use threadpool and what advantages it has.

    You might not want to disable multithreading for DBSCAN. The problem is not using multithreading, exactly. The problem is using multithreading inside the parent process. If the parent process never runs DBSCAN, then it will never do multithreading inside the parent process.

    So this gives us Possible Fix #3: only run cost_function() inside of a child process.

    import numpy as np
    from scipy.optimize import minimize
    from sklearn.cluster import DBSCAN
    from multiprocessing import Pool
    
    
    def cost_function(x):
        print(f"Running cost function for x: {x}")
        clustering = DBSCAN().fit(np.array([x]))
        cost = np.sum(x**2)
        print(f"Cost for x: {x} is {cost}")
        return cost
    
    
    def cost_function_subproc(x):
        # Only launch 1 process - that's all we need.
        with Pool(1) as pool:
            return pool.map(cost_function, [x])[0]
    
    
    def gradient_function(x):
        print(f"Calculating gradient for x: {x}")
        with Pool() as pool:
            gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
        print(f"Gradient for x: {x} is {gradients}")
        return np.array(gradients)
    
    if __name__ == "__main__":
        x0 = np.array([0.5, 0.5])
        print("Starting optimization...")
        result = minimize(cost_function_subproc, x0, method='SLSQP', jac=gradient_function)
    
        print("Optimization result:", result)
    

    All of the possible fixes were tested on Ubuntu 22.04. I think any of them would work, but which one is best depends on the problem.