numpymemory-managementmultiprocessingpython-3.7ray

Out of Memory with RAY Python Framework


I have created a simple remote function with ray that utilizes very little memory. However, after running for a short period of time the memory increases steadily and I get a RayOutOfMemoryError Exception.

The following code is a VERY simple example of this problem. The "result_transformed" numpy array is being sent to the workers where each worker can do work on this. My simplified calc_similarity function does nothing, but it still runs out of memory. I have added much longer sleep times to that method to simulate doing more work, but it eventually runs out of memory.

I am running on an 8-core Intel 9900K with 32GB of RAM and Ubuntu 19.10 Python is: Intel Python Distribution 3.7.4 numpy is 1.17.4 (with intel mkl)

import numpy as np
from time import sleep
import ray
import psutil

@ray.remote
def calc_similarity(sims, offset):
    # Fake some work for 100 ms.
    sleep(0.10)
    return True

if __name__ == "__main__":
    # Initialize RAY to use all of the processors.
    num_cpus = psutil.cpu_count(logical=False)
    ray.init(num_cpus=num_cpus)

    num_docs = 1000000
    num_dimensions = 300
    chunk_size = 128
    sim_pct = 0.82

    # Initialize the array
    index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32)
    index_array = np.arange(num_docs).reshape(1, num_docs)
    index_array_id = ray.put(index_array)

    calc_results = []

    for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)):
        size = min( chunk_size, num_docs - (start_doc_no) + 1 )
        # Get the query vector out of the index.
        query_vector = index[start_doc_no:start_doc_no+size]
        # Calculate the matrix multiplication.
        result_transformed = np.matmul(index, query_vector.T).T
        # Serialize the result matrix out for each client.
        result_id = ray.put(result_transformed)

        # Simulate multi-threading extracting the results of a cosine similarity calculation
        for offset in range(chunk_size):
            calc_results.append(calc_similarity.remote(sims=result_id, offset=offset ))
            # , index_array=index_array_id))
        res = ray.get(calc_results)
        calc_results.clear()

Any help/guidance would be greatly appreciated.


Solution

  • Currently, Ray supports reference counting partially. (Full reference counting will be released soon). Simply put, when the object_id passed to the remote function is not serialized, it is referenced counted in the same way Python is referenced counted. It means if result_transformed is garbage collected by Python, the result_transformed in the plasma store should be unpinned, and when the object is LRU evicted, it should be evicted. (For the clarity, pinned objects which has some reference counts are not evicted).

    I also assume there is some weird reference counting such as circular references. I could verify that result_transformed was evicted when I ran this script. So, I guess result_transformed itself is not a problem. There can be many possible problems. For my case, I found that ipython creates a reference to python objects when I use it for input (IN). (E.g. when you see the value of some object, OUT[number] can have a reference to your object).

    In [2]: import psutil 
       ...: import gc 
       ...: import ray 
       ...: from time import sleep 
       ...: import numpy as np 
       ...: @ray.remote 
       ...: def calc_similarity(sims, offset): 
       ...:     # Fake some work for 100 ms. 
       ...:     sleep(0.10) 
       ...:     return True 
       ...:  
       ...: if __name__ == "__main__": 
       ...:     # Initialize RAY to use all of the processors. 
       ...:     num_cpus = psutil.cpu_count(logical=False) 
       ...:     ray.init(num_cpus=num_cpus) 
       ...:  
       ...:     num_docs = 1000000 
       ...:     num_dimensions = 300 
       ...:     chunk_size = 128 
       ...:     sim_pct = 0.82 
       ...:  
       ...:     # Initialize the array 
       ...:     index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32) 
       ...:     index_array = np.arange(num_docs).reshape(1, num_docs) 
       ...:     index_array_id = ray.put(index_array) 
       ...:  
       ...:     calc_results = [] 
       ...:     i = 0 
       ...:     for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)): 
       ...:         i += 1 
       ...:         size = min( chunk_size, num_docs - (start_doc_no) + 1 ) 
       ...:         # Get the query vector out of the index. 
       ...:         query_vector = index[start_doc_no:start_doc_no+size] 
       ...:         # Calculate the matrix multiplication. 
       ...:         result_transformed = np.matmul(index, query_vector.T).T 
       ...:         # Serialize the result matrix out for each client. 
       ...:         result_id = ray.put(result_transformed) 
       ...:         if i == 1: 
       ...:             # The first result_id binary number should be stored in result_id_special 
       ...:             # In this way, we can verify if this object id is evicted after filling up our  
       ...:             # plasma store by some random numpy array 
       ...:             # If this object id is not evicted, that means it is pinned, meaning if is  
       ...:             # not properly reference counted. 
       ...:             first_object_id = result_id.binary() 
       ...:         # Simulate multi-threading extracting the results of a cosine similarity calculation 
       ...:         for offset in range(chunk_size): 
       ...:             calc_results.append(calc_similarity.remote(sims=result_id, offset=offset )) 
       ...:             # , index_array=index_array_id)) 
       ...:         res = ray.get(calc_results) 
       ...:         calc_results.clear() 
       ...:         print('ref count to result_id {}'.format(len(gc.get_referrers(result_id)))) 
       ...:         print('Total number of ref counts in a ray cluster. {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts())) 
       ...:         if i == 5: 
       ...:             break 
       ...:     # It should contain the object id of the  
       ...:     print('first object id: {}'.format(first_object_id)) 
       ...:     print('fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.') 
       ...:     print('because if the data_transformed is garbage collected properly, it should be unpinned from plasma store') 
       ...:     print('and when plasma store is filled by numpy array, first_object_id should be evicted.') 
       ...:     for _ in range(40): 
       ...:         import numpy as np 
       ...:         ray.put(np.zeros(500 * 1024 * 1024, dtype=np.uint8)) 
       ...:     print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts())) 
       ...:     # this should fail as first_object_id is already evicted 
       ...:     print(ray.get(ray.ObjectID(first_object_id))) 
    
    [ray] Forcing OMP_NUM_THREADS=1 to avoid performance degradation with many workers (issue #6998). You can override this by explicitly setting OMP_NUM_THREADS.
    2020-02-12 00:10:11,932 INFO resource_spec.py:212 -- Starting Ray with 4.35 GiB memory available for workers and up to 2.19 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
    2020-02-12 00:10:12,273 INFO services.py:1080 -- View the Ray dashboard at localhost:8265
    2020-02-12 00:10:18,522 WARNING worker.py:289 -- OMP_NUM_THREADS=1 is set, this may slow down ray.put() for large objects (issue #6998).
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008002000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008003000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008004000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008005000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    first object id: b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x80\x02\x00\x00\x00'
    fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.
    because if the data_transformed is garbage collected properly, it should be unpinned from plasma store
    and when plasma store is filled by numpy array, first_object_id should be evicted.
    total ref count from a ray cluster after eviction: {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    2020-02-12 00:10:57,108 WARNING worker.py:1515 -- Local object store memory usage:
    num clients with quota: 0
    quota map size: 0
    pinned quota map size: 0
    allocated bytes: 2092865189
    allocation limit: 2347285708
    pinned bytes: 520000477
    (global lru) capacity: 2347285708
    (global lru) used: 67.0078%
    (global lru) num objects: 4
    (global lru) num evictions: 41
    (global lru) bytes evicted: 21446665725
    
    2020-02-12 00:10:57,112 WARNING worker.py:1072 -- The task with ID ffffffffffffffffffffffff0100 is a driver task and so the object created by ray.put could not be reconstructed.
    ---------------------------------------------------------------------------
    UnreconstructableError                    Traceback (most recent call last)
    <ipython-input-1-184e5836123c> in <module>
         63     print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts()))
         64     # this should fail as first_object_id is already evicted
    ---> 65     print(ray.get(ray.ObjectID(first_object_id)))
         66 
    
    ~/work/ray/python/ray/worker.py in get(object_ids, timeout)
       1517                     raise value.as_instanceof_cause()
       1518                 else:
    -> 1519                     raise value
       1520 
       1521         # Run post processors.
    
    UnreconstructableError: Object ffffffffffffffffffffffff0100008002000000 is lost (either LRU evicted or deleted by user) and cannot be reconstructed. Try increasing the object store memory available with ray.init(object_store_memory=<bytes>) or setting object store limits with ray.remote(object_store_memory=<bytes>). See also: https://ray.readthedocs.io/en/latest/memory-management.html