pytorchgunicornamazon-sagemakerpytorch-dataloader

Pytorch-Dataloader throws multiprocessing exception when deployed inside gunicorn/flask server


I am deploying my pytorch model in aws-sagemaker container and using gunicorn server for inference

imports i have

Dependency versions

            matplotlib = 3.0;
            pandas = 2.0;
            PyTorch = 2.0.1;
            tqdm = 4.0;
            cuda = 11.7
import logging
import os
import shutil
import pandas as pd
import torch
from time import perf_counter

import json

shutil._USE_CP_SENDFILE = False
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from torch import cuda
from awsme import create_cloud_watch
from pandarallel import pandarallel

pandarallel.initialize(progress_bar=False)

device = 'cuda' if cuda.is_available() else 'cpu'
def invoke(request):
        logger.info('Rawinput-_requ: {}'.format(request))
        start_secs = perf_counter()
        request = request["title"]
        batch_size = 10
        num_workers = 2
        # num_workers = 0 # This gives result without any error

        logger.info('Request: {}'.format(request))
        df_test = pd.read_json(json.dumps(request))
        df_test['standardRelevancy'] = 1
        test_dataset = RelevanceDataset(df_test,
                                        tokenizer=tokenizer,
                                        label_column="standardRelevancy",
                                        max_length=100,
                                        do_lower_case=True)

        test_dataloader = DataLoader(test_dataset,
                                     batch_size=batch_size,
                                     shuffle=False,
                                     num_workers=num_workers,
                                     pin_memory=True)

            emb_scores_list, labels_list = run_eval_epoch(test_dataloader)
            df_test["score"] = emb_scores_list

Invoke is calling below method

def run_eval_epoch(dataloader):
    emb_scores_list = []
    labels_list = []
    model.eval()
    with torch.set_grad_enabled(False):
        for i, batch_data in enumerate(tqdm(dataloader)):
            batch_data = {x: y.to(device) for x, y in batch_data.items()}
            outputs = model(**batch_data, return_dict=True)
            batch_softmax_logits = torch.nn.Sigmoid()(outputs.logits)
            emb_scores_list.extend(batch_softmax_logits[:, -1].data.cpu().numpy().tolist())
            labels_list.extend(batch_data["labels"].cpu().numpy().tolist())
            del outputs
    return emb_scores_list, labels_list

when i try to call def invoke(request): I am seeing following exception


   File "/opt/jk/python3.8/lib/python3.8/multiprocessing/resource_sharer.py", line 142, in _serve
    with self._listener.accept() as conn:
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 465, in accept
    deliver_challenge(c, self._authkey)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 740, in deliver_challenge
    response = connection.recv_bytes(256)        # reject large message
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable
  0%|          | 0/1 [00:00<?, ?it/s]
ERROR 2023-10-18 08:00:01,626: invoke:152: Error occurred
Traceback (most recent call last):
  File "/opt/jk/lib/python3.8/site-packages/jk_test_jk_dummy_model_deploy1/service.py", line 142, in invoke
    emb_scores_list, labels_list = run_eval_epoch(test_dataloader)
  File "/opt/jk/lib/python3.8/site-packages/jk_test_jk_dummy_model_deploy1/service.py", line 93, in run_eval_epoch
    for i, batch_data in enumerate(tqdm(dataloader)):
  File "/opt/jk/lib/python3.8/site-packages/tqdm/std.py", line 1195, in __iter__
    for obj in iterable:
  File "/opt/jk/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 633, in __next__
    data = self._next_data()
  File "/opt/jk/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1328, in _next_data
    idx, data = self._get_data()
  File "/opt/jk/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1294, in _get_data
    success, data = self._try_get_data()
  File "/opt/jk/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1132, in _try_get_data
    data = self._data_queue.get(timeout=timeout)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
  File "/opt/jk/lib/python3.8/site-packages/torch/multiprocessing/reductions.py", line 307, in rebuild_storage_fd
    fd = df.detach()
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/resource_sharer.py", line 57, in detach
    with _resource_sharer.get_connection(self._id) as conn:
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/resource_sharer.py", line 87, in get_connection
    c = Client(address, authkey=process.current_process().authkey)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 508, in Client
    answer_challenge(c, authkey)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 752, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/opt/jk/python3.8/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable
240.10.1.5 - - [18/Oct/2023:08:00:01 +0000] "POST /invocations HTTP/1.1" 200 4 "-" "python-requests/2.31.0"

When i set num_workers=0 I am getting result without any issues . If. i set num_workers>0 then i see above problem

In addition when i. run the above code in jupyterNoteBook i dont see this issue.

This is happening when i deploy inside gunicorn . Unfortunately i dont control gunicorn setup . I can control only pytorch related stuff

I tried most of internet solutions like

    torch.set_num_threads(1)
   set_start_method('spawn')

etc.

any pointers on this issue would be helpful

Thanks Jk


Solution

  • Google bard helped to solve this issue :)

    As per bard this is caused because

    Yes, there is a known compatibility issue between PyTorch dataloaders and Gunicorn. This issue is caused by the way that Gunicorn handles multiprocessing.

    Gunicorn uses a worker process model to handle requests. Each worker process has its own copy of the Python interpreter and the PyTorch library. When a dataloader is created in a worker process, it will try to share its data with other worker processes. This can lead to the BlockingIOError: [Errno 11] Resource temporarily unavailable error.

    There are different solutions we can try from bard solutions. One of the easiest solution which worked for me is setting this param

    torch.multiprocessing.set_sharing_strategy('file_system')