I wanna increase the accuracy of my speech2text model with using a N-Gram. So i'm using this line of code to apply the function on the whole dataset as below:
result = dataset.map(predict, batch_size=5, num_proc=int(os.environ.get('cpu_core')))
The CPU core I set for 'cpu_core'
is 8.
Here is the predict
function code:
def predict(batch):
batch["predicted"] = processor.batch_decode(np.array(batch["logits"])).text[0]
print(batch["predicted"])
return batch
I'm using this line in a try
block, which is in a while True
loop and when the program will face a multiprocess error, it will stuck in the while true
loop. Here is the complete code:
while True:
try:
dataset = dataset.map(speech_file_to_array_fn)
# If we're using n-gram
if os.environ.get('active_ngram') == '1':
dataset = dataset.map(predict_model)
print("\nN-Gram started\n")
result = dataset.map(predict, batch_size=5, num_proc=int(os.environ.get('cpu_core'))) # This is the line that occurs the error
except KeyboardInterrupt:
print('interrupted!')
break
except:
pass
Now I want to know how can i handle this multiprocess error. (python 3.8.10 & ubuntu 20.04.4)
here is the error:
^CProcess ForkPoolWorker-3335:█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████Process ForkPoolWorker-3330:██████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [02:08<00:00, 37.41s/ex] Process ForkPoolWorker-19: Process ForkPoolWorker-3333: Process ForkPoolWorker-16: Process ForkPoolWorker-21: Process ForkPoolWorker-13: Process ForkPoolWorker-15: Process ForkPoolWorker-12: Process ForkPoolWorker-14: Process ForkPoolWorker-3336: Process ForkPoolWorker-3331: Process ForkPoolWorker-3334: Process ForkPoolWorker-3332: Process ForkPoolWorker-18: Process ForkPoolWorker-17: #0: 25%|██████████████████████████████████████████████████████████████████████████████▌ | 1/4 [14:09:32<42:28:38, 50972.67s/ex] Process ForkPoolWorker-20: Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 356, in get res = self._reader.recv_bytes() File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes buf = self._recv(4) File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) KeyboardInterrupt Traceback (most recent call last): Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker task = get() File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get with self._rlock: File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in enter return self._semlock.enter() KeyboardInterrupt interrupted! ^C
I finally fixed this error. The BrokenPipeError: [Error 32] broken pipe
arises from the Linux operating system and it will occur when you are doing IO tasks. Specifically, when the pipeline of read and write on Linux is closed, as the other side of communication, is still trying to read or write the data, this error will occur.
Now the fun part is here: I was using 6 workers
which was the number of CPU cores
in my map function. The length of data in the dataset
iterable was 25. So the pipeline doing the map function had 5 rows with 4 files in each and 1 row with 5 files. I guess it was the cause of the error while the last row with 5 files made some disturbance. So I reduced the number of files in the dataset from 25 to 24 and the number of workers to 6 and removed the batch_size=5
. This made the length of the data to be divisible by the number of processes and made the error go away. Here is the link for more information about BrokenPipeline Error
.