pythonmultiprocessingspeech-to-textn-gram

Multiprocess error while using map function in python with N-Gram language model


I wanna increase the accuracy of my speech2text model with using a N-Gram. So i'm using this line of code to apply the function on the whole dataset as below:

result = dataset.map(predict, batch_size=5, num_proc=int(os.environ.get('cpu_core')))

The CPU core I set for 'cpu_core' is 8.

Here is the predict function code:

def predict(batch):
     batch["predicted"] = processor.batch_decode(np.array(batch["logits"])).text[0]
     print(batch["predicted"])
     return batch

I'm using this line in a try block, which is in a while True loop and when the program will face a multiprocess error, it will stuck in the while true loop. Here is the complete code:

while True:

    try:
        dataset = dataset.map(speech_file_to_array_fn)

        # If we're using n-gram
        if os.environ.get('active_ngram') == '1':

            dataset = dataset.map(predict_model)

        print("\nN-Gram started\n")
        result = dataset.map(predict, batch_size=5, num_proc=int(os.environ.get('cpu_core'))) # This is the line that occurs the error

    except KeyboardInterrupt:
        print('interrupted!')
        break  

    except:
        pass

Now I want to know how can i handle this multiprocess error. (python 3.8.10 & ubuntu 20.04.4)

here is the error:

^CProcess ForkPoolWorker-3335:█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████Process ForkPoolWorker-3330:██████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [02:08<00:00, 37.41s/ex] Process ForkPoolWorker-19: Process ForkPoolWorker-3333: Process ForkPoolWorker-16: Process ForkPoolWorker-21: Process ForkPoolWorker-13: Process ForkPoolWorker-15: Process ForkPoolWorker-12: Process ForkPoolWorker-14: Process ForkPoolWorker-3336: Process ForkPoolWorker-3331: Process ForkPoolWorker-3334: Process ForkPoolWorker-3332: Process ForkPoolWorker-18: Process ForkPoolWorker-17: #0: 25%|██████████████████████████████████████████████████████████████████████████████▌ | 1/4 [14:09:32<42:28:38, 50972.67s/ex] Process ForkPoolWorker-20: Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 356, in get res = self._reader.recv_bytes() File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes buf = self._recv(4) File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) KeyboardInterrupt Traceback (most recent call last): Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt interrupted! ^C


Solution

  • I finally fixed this error. The BrokenPipeError: [Error 32] broken pipe arises from the Linux operating system and it will occur when you are doing IO tasks. Specifically, when the pipeline of read and write on Linux is closed, as the other side of communication, is still trying to read or write the data, this error will occur.

    Now the fun part is here: I was using 6 workers which was the number of CPU cores in my map function. The length of data in the dataset iterable was 25. So the pipeline doing the map function had 5 rows with 4 files in each and 1 row with 5 files. I guess it was the cause of the error while the last row with 5 files made some disturbance. So I reduced the number of files in the dataset from 25 to 24 and the number of workers to 6 and removed the batch_size=5. This made the length of the data to be divisible by the number of processes and made the error go away. Here is the link for more information about BrokenPipeline Error.