javamultithreadingthread-safetycallableatomicreference

How to add hostname to block list after consecutive failures in multithreading application?


I am using Callable in my code which will be called by multiple threads as shown below. As of now, whenever any RestClientException is thrown then I am adding hostname to blockList.

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        ResponseEntity<String> response = null;

        // construct what are the hostnames I can call basis on user id
        List<String> hostnames = some_code_here;

        for (String hostname : hostnames) {
            // If host name is null or host name is in block list, skip sending request to this host
            if (DataUtils.isEmpty(hostname) || DataMapping.isBlocked(hostname)) {
                continue;
            }
            try {
                String url = createURL(hostname);
                response = restTemplate.exchange(url, HttpMethod.GET, key.getEntity(), String.class);

                // some code here to return the response if successful
            } catch (HttpClientErrorException ex) {
                // log exception
                return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
            } catch (HttpServerErrorException ex) {
                // log exception
                return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
            } catch (RestClientException ex) {
                // I don't want to add it to block list instantly.
                // If same hostname as failed five times consecutively, then only add it
                DataMapping.blockHost(hostname);
            }
        }

        return new DataResponse(DataErrorEnum.SERVER_UNAVAILABLE, DataStatusEnum.ERROR);        
    }
}

Below is what I have in DataMapping class:

private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = 
        new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());

public static boolean isBlocked(String hostName) {
    return blockedHosts.get().containsKey(hostName);
}

public static void blockHost(String hostName) {
    blockedHosts.get().put(hostName, hostName);
}

Problem Statement:-

Now as you can see in call method, I am blocking the hostname as soon as it throws RestClientException which may not be right. I need to see if a particular hostname has thrown RestClientException for five times consecutively, then only add this hostname to blockList by calling this line DataMapping.blockHost(hostname); otherwise don't add it to the blockList.

What is the most efficient and best way to do this? Maximum, I will have 70-100 unique machines in total.

In this case, my call method will be called from multiple threads so I need to make sure I am keeping the count properly for each hostname in case they throw RestClientException.

EDIT:

I also have below method in DataMapping class as well:

I have a background thread which runs every 2 minutes that replaces the entire set, since my service provides real data whether any hostname is really blocked or not. And I guess I do need the atomic reference for when I replace the entire set.

I am adding block feature locally in the code as well since I might be to know which machine is blocked after 2 minutes so it's better to know it before hand if possible.

// this is being updated from my background thread which runs every 2 minutes
public static void replaceBlockedHosts(List<String> hostNames) {
    ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
    for (String hostName : hostNames) {
        newBlockedHosts.put(hostName, hostName);
    }
    blockedHosts.set(newBlockedHosts);
}

Solution

  • I would associate each host with an AtomicInteger that is incremented on each RestClientException. This integer would be set to zero on a succesful call to enforce the "five consecutive times" constraint. The code would look something like this.

    private final ConcurrentHashMap<String, AtomicInteger> failedCallCount = new ConcurrentHashMap<>();
    
    void call() {
          try {
              String url = createURL(host);
              // make rest call
              resetFailedCallCount(host);
              // ...
          } catch (RestClientException ex) {
              registerFailedCall(host);
              if (shouldBeBlocked(host)) {
                  DataMapping.blockHost(host);
              }
          }
    }
    
    
    private boolean shouldBeBlocked(String hostName) {
        AtomicInteger count = failedCallCount.getOrDefault(hostName, new AtomicInteger());
        return count.get() >= 5;
    }
    
    private void registerFailedCall(String hostName) {
        AtomicInteger newValue = new AtomicInteger();
        AtomicInteger val = failedCallCount.putIfAbsent(hostName, newValue);
        if (val == null) {
            val = newValue;
        }
        if (val.get() < 5) {
            val.incrementAndGet();
        }
    }
    
    private void resetFailedCallCount(String hostName) {
        AtomicInteger count = failedCallCount.get(hostName);
        if (count != null) {
            count.set(0);
        }
    }
    

    This is lock free (in our own code at least) and is very efficient. However it is vulnerable to some race conditions. Most notable the count can become larger than 5. However, that should not be a problem since the host is blocked anyways and the count is not used for anything else.