pythonnumpyparallel-processingmultiprocessingviterbi

Using multiprocessing module to runs parallel processes where one is fed (dependent) by the other for Viterbi Algorithm


I have recently played around with Python's multiprocessing module to speed up the forward-backward algorithm for Hidden Markov Models as forward filtering and backward filtering can run independently. Seeing the run-time halve was awe-inspiring stuff.

I now attempt to include some multiprocessing in my iterative Viterbi algorithm.In this algorithm, the two processes I am trying to run are not independent. The val_max part can run independently but arg_max[t] depends on val_max[t-1]. So I played with the idea that one can run val_max as a separate process and then arg_max also as a separate process which can be fed by val_max.

I admit to be a bit out of my depth here and do not know much about multiprocessing other than watching some basic video's on it as well as browsing blogs. I provide my attempt below, but it does not work.


import numpy as np
from time import time,sleep
import multiprocessing as mp

class Viterbi:


    def __init__(self,A,B,pi):
        self.M = A.shape[0] # number of hidden states
        self.A = A  # Transition Matrix
        self.B = B   # Observation Matrix
        self.pi = pi   # Initial distribution
        self.T = None   # time horizon
        self.val_max = None
        self.arg_max = None
        self.obs = None
        self.sleep_time = 1e-6
        self.output = mp.Queue()


    def get_path(self,x):
        # returns the most likely state sequence given observed sequence x
        # using the Viterbi algorithm
        self.T = len(x)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        self.val_max[0] = self.pi*self.B[:,x[0]]
        for t in range(1, self.T):
            # Indepedent Process
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0  ) 
            # Dependent Process
            self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)

        # BACKTRACK
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(self.val_max[self.T-1])
        for t in range(self.T-2, -1, -1):
            states[t] = self.arg_max[t+1, states[t+1]]
        return states

    def get_val(self):
        '''Independent Process'''
        for t in range(1,self.T):
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,self.obs[t]]) , axis = 0  ) 
        self.output.put(self.val_max)

    def get_arg(self):
        '''Dependent Process'''
        for t in range(1,self.T):
            while 1:
                # Process info if available
                if self.val_max[t-1].any() != 0:
                    self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
                    break
                # Else sleep and wait for info to arrive
                sleep(self.sleep_time)
        self.output.put(self.arg_max)

    def get_path_parallel(self,x):
        self.obs = x
        self.T = len(obs)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        val_process = mp.Process(target=self.get_val)
        arg_process = mp.Process(target=self.get_arg)  
        # get first initial value for val_max which can feed arg_process
        self.val_max[0] = self.pi*self.B[:,obs[0]]
        arg_process.start()
        val_process.start()
        arg_process.join()
        val_process.join()

Note: get_path_parallel does not have backtracking yet.

It would seem that val_process and arg_process never really run. Really not sure why this happens. You can run the code on the Wikipedia example for the viterbi algorithm.

obs = np.array([0,1,2])  # normal then cold and finally dizzy  

pi = np.array([0.6,0.4])

A = np.array([[0.7,0.3],
             [0.4,0.6]])

B = np.array([[0.5,0.4,0.1],
             [0.1,0.3,0.6]]) 

viterbi = Viterbi(A,B,pi)
path = viterbi.get_path(obs)

I also tried using Ray. However, I had no clue what I was really doing there. Can you please help recommend me what to do in order to get the parallel version to run. I must be doing something very wrong but I do not know what.

Your help would be much appreciated.


Solution

  • Welcome to SO. Consider taking a look at producer-consumer pattern that is heavily used in multiprocessing.

    Beware that multiprocessing in Python reinstantiates your code for every process you create on Windows. So your Viterbi objects and therefore their Queue fields are not the same.

    Observe this behaviour through:

    import os
    
    def get_arg(self):
        '''Dependent Process'''
        print("Dependent ", self)
        print("Dependent ", self.output)
        print("Dependent ", os.getpid())
    
    def get_val(self):
        '''Independent Process'''
        print("Independent ", self)
        print("Independent ", self.output)
        print("Independent ", os.getpid())
    
    if __name__ == "__main__":
        print("Hello from main process", os.getpid())
        obs = np.array([0,1,2])  # normal then cold and finally dizzy  
    
        pi = np.array([0.6,0.4])
    
        A = np.array([[0.7,0.3],
                 [0.4,0.6]])
    
        B = np.array([[0.5,0.4,0.1],
                 [0.1,0.3,0.6]]) 
    
        viterbi = Viterbi(A,B,pi)
        print("Main viterbi object", viterbi)
        print("Main viterbi object queue", viterbi.output)
        path = viterbi.get_path_parallel(obs)
    

    There are three different Viterbi objects as there are three different processes. So, what you need in terms of parallelism is not processes. You should explore the threading library that Python offers.