We are trying to use Hazelcast EntryProcessor. Our understanding was that the EntryProcessor.process method should be called once on the node where the primary copy exists.
Following is our try:
public class HazelcastBackupEntryProcessorTest {
public static void main(String[] args) throws InterruptedException {
List<HazelcastInstance> hazelcastInstances = setupHazelcast(2);
HazelcastInstance hazelcastInstance = hazelcastInstances.get(0);
IMap<Integer, String> testMap;
testMap = hazelcastInstance.getMap("testMap");
if (!testMap.containsKey(1)) {
testMap.put(1, "Holmes");
testMap.executeOnEntries(new UpdateNameEntryProcessor("Watson"));
}
}
private static List<HazelcastInstance> setupHazelcast(int numberOfInstances) {
List<HazelcastInstance> instances = new ArrayList<>();
IntStream.range(0, numberOfInstances).forEach(i -> {
Config config = new Config();
config.getJetConfig().setEnabled(true);
instances.add(Hazelcast.newHazelcastInstance(config));
});
return instances;
}
private static class UpdateNameEntryProcessor implements EntryProcessor<Integer, String, String> {
private final String name;
public UpdateNameEntryProcessor(String name) {
this.name = name;
}
@Override
public String process(Entry<Integer, String> entry) {
System.out.println("Coming Here");
entry.setValue(name);
return null;
}
}
}
What could be the possible explanation as to why the process method is being called twice ?
Tried changing the number of HazelcastInstances. when the HZ Instance count is 1, then the process method is invoked once. For any count > 1, the process method is called twice.
The default value of backup count of IMap is 1. So when an entry is added its backup also exists on the 2nd member. Therefore executeOnEntries() runs on both members. This makes data propagation faster.
If you need to execute only on one member strictly There are two solutions
Here we are overriding getBackupProcessor() method. It returns null. So that entry processor does not run on backups. HZ will sync the master data with backup but with latency
private static class UpdateNameEntryProcessor
implements EntryProcessor<Integer, String, String> {
private final String name;
public UpdateNameEntryProcessor(String name) {
this.name = name;
}
@Override
public EntryProcessor<Integer, String, String> getBackupProcessor() {
return null;
}
@Override
public String process(Map.Entry<Integer, String> entry) {
// Same as your code
}
}
private static List<HazelcastInstance> setupHazelcast(int numberOfInstances) {
List<HazelcastInstance> instances = new ArrayList<>();
IntStream.range(0, numberOfInstances).forEach(i -> {
Config config = new Config();
MapConfig mapConfig = config.getMapConfig(MAP_NAME);
mapConfig.setBackupCount(0);
JetConfig jetConfig = config.getJetConfig();
jetConfig.setEnabled(true);
instances.add(Hazelcast.newHazelcastInstance(config));
});
return instances;
}