Removing HazelCast from an application that no longer needs distributed caching.
However I still need to keep the synchronized access to a Map similar to IMap.
I first wanted to replace IMap with Map and ConcurrentHashMap.
Now the collection is just a simple Java Map, but I still need to control asynchronous read/write to it.
Would a ConcurrentHashMap be enough to solve this, or do I need other locking solutions/mechanism.
The application is using RabbitMQ, with 3 Queues.
With HazelCast I had a IMap with domain objects, that i performed locking on in each MQ thread accessing it.
During Initialization of a Batch job, it creates new domain objects, and places it on the Map.
At the end of the Initialization it sends out a request on 3 different message queues for each object in the Map.
Only 1 MQ thread should be able to get an object from the Map, update it, and put it back on the Map.
Can ConcurrentHashMap handle that, I do I need to implement some locking/unlocking between the Map.get and Map.put?
With HazelCast this was done with locking on the Map.
IMap<Long, MyObject> myCollection;
public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
final Long myObjectId = response.getObjectId();
myCollection.lock(myObjectId);
try {
final MyObject myObject = myCollection.get(myObjectId);
updateMyObject(myObject, response);
myCollection.put(myObjectId, myObject)
if (myObject.isCompleted()) {
repository.save(myObject);
}
} finally {
myCollection.unlock(myObjectId);
}
}
public void processMessage2(final MyResponse2 reponse) { // MQ-Thread2
// Similar with locking as processMessage1
}
public void processMessage3(final MyResponse3 reponse) { // MQ-Thread3
// Similar with locking as processMessage1
}
How my code is now when changed HazelCast and IMap locking to ConcurrentHashMap.
@Data
@Component
public class MyCache {
private final Map<Long, Producer> producers = new HashMap<>();
private final ConcurrentMap<Long, Customer> customers = new ConcurrentHashMap<>();
}
@Service
public class CustomerService {
private final MyCache myCache;
private final StorageService storageService;
public CustomerService(final MyCache myCache, final StorageService storageService) {
this.myCache = myCache;
this.storageService = storageService;
}
@Transaction
public void startProcessingAll() {
getProducers();
deleteCustomers();
myCache.getProducers().keySet().forEach(id -> startProcessingProducer(id));
}
private void startProcessingProducer(Long producerId) {
log.info("Started processing for producerId={}", producerId);
initialize(producerId);
}
private void initialize(Long producerId) {
log.info("Initialize start {}", producerId);
final Customer customer = new Customer();
sendRabbitMq1(producerId);
sendRabbitMq2(producerId);
sendRabbitMq3(producerId);
myCache.getCustomers().put(producerId, customer);
log.info("Initialize end {}", producerId);
}
private void deleteCustomers() {
storageService.deleteCustomers();
}
}
@Service
public class MessageService {
private final MyCache myCache;
private final StorageService storageService;
public MessageService(final MyCache myCache, final StorageService storageService) {
this.myCache = myCache;
this.storageService = storageService;
}
@Bean
@Transactional
public Consumer<MyResponse1> response1() {
return this::processResponse1;
}
@Bean
@Transactional
public Consumer<MyResponse2> response2() {
return this::processResponse2;
}
@Bean
@Transactional
public Consumer<MyResponse3> response3() {
return this::processResponse3;
}
private void processResponse1(final MyResponse1 response) {
myCache.getCustomers().computeIfPresent(response.getProducerId(),
(producerId, customer) -> updateFromResponse1(response, customer));
}
private Customer updateFromResponse1(final MyResponse1 response, final Customer customer) {
log.debug("Got Response 1 {}", response);
// Updating Customer with Response
process(customer, storageService::saveCustomer);
return customer;
}
public void process(final Customer customer, final Consumer<Customer> customerUpdateCallback) {
// Processing Customer
customerUpdateCallback.accept(customer);
}
}
@Service
public class StorageService {
private final CustomerRepository customerRepository;
public StorageService(final CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
}
public void saveCustomer(final Customer customer) {
customerRepository.save(customer);
}
public void deleteCustomer() {
customerRepository.deleteAll();
}
}
ConcurrentHashMap can address key locking using one of the following method:
The key information is that the javadoc states:
The entire method invocation is performed atomically. The supplied function is invoked exactly once per invocation of this method.
If we adapt your code sample to work for update with a ConcurrentHashMap, it would give something like this:
final Map<Long, MyObject> myCollection = new ConcurrentHashMap<>();
public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
final Long myObjectId = response.getObjectId();
myCollection.computeIfPresent(myObjectId, (key, oldValue) -> {
var newValue = updateMyObject(oldValue, response);
if (newValue.isCompleted()) {
repository.save(newValue);
}
return newValue;
});
}