Starting to try to use Ray to parallelize a number of task-parallel jobs. I.e. each task takes in an object from a data frame, and then returns a list. Within the function, there is a check for a property of the object though, and if that property if fulfilled I want the task to be canceled gracefully. (I know one could hack around it by setting the retries per task to 0, and then set the number of permitted task failures to infinity)
The structure of the function is sort of like this:
import ray
import numpy as np
ray.init()
@ray.remote
def test_function(x):
if np.random.rand() < 0.5:
raise Exception("blub")
return [x, x*x, "The day is blue"]
futures = [test_function.remote(i) for i in range(10000)]
print(ray.get(futures))
Is there a best-practice, or a graceful way to let the individual tasks fail?
reproducible
One way to reproduce the effect with toy data would be to take sha1(i)
as i
ranges from zero to thirty-five thousand. Mask off some low order bits, and if result is "small" report error, else success. Using modulo on that can also be convenient.
When I run it across all cores Ray reattempts failed tasks 3 times, and after 24 retries across the entire job, it cancels the entire job-queue. Out of 35k samples in the dataset, this affects ~1k samples.
Ok, now your Question makes sense.
You're explaining that, for the current Ray configuration, it is unacceptable for any of those one thousand samples to report an error which is seen by Ray. So wrap your function with a try
/ except
, log what happened so you can chase it down later, and return successfully so Ray won't retry. You can do that by making the function a little longer, or by writing a @suppress_error
decorator that accomplishes the same thing.
def suppress_error(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
diagnostic = f"Error in {func.__name__}: {e}"
logger = logging.getLogger(__name__)
logger.warning(diagnostic)
print(diagnostic, file=sys.stderr)
return [] # zero result rows
return wrapper
Presumably you adopted the "3 retries" setting in hopes of dealing with transient webserver TCP timeouts or perhaps the occasionally full filesystem.
Consider defining your "expensive" function such that it always serializes its results to a unique filename in some folder, writing zero or more result rows. Upon failure, write zero rows. Then a 2nd-phase task can come along to aggregate those results.
one could hack around it by setting the retries per task to 0, and then set the number of permitted task failures to infinity)
It's unclear why you didn't adopt that "no code" solution, since changing the config sounds easier than changing the code. Maybe there is still transient webserver timeouts you're still worried about?
More generally, there are assumptions in your code + config which impact whether an analysis job will ever complete, and their implications are not immediately apparent. Write down, in your source code repository, the assumptions, their implications, and observed results such as timings. This will help future maintenance engineers, such as yourself, to better reason about "what is good?" in the current setup, and what might be changed to improve the setup.
I'm hesitant to adopt the "no code" solution as while this configuration might work on one cluster, it is highly uncertain if it still works on another cluster.
Ray has well documented retry behavior, which is triggered by your application code raising an error which Ray sees. It is important for your application to behave correctly, raising visible errors only when appropriate. Interpose an error handling layer if needed, to impedance match between the app's observed behavior and Ray's documented response to its behavior.
If you have app1 and app2 running on various clusters, you don't have to change a global config. The app1 behavior can be adjusted in this way:
future1 = test_function.remote(i) # original error behavior
future2 = test_function.options(num_retries=0).remote(i)