I have a code where I send data to our queues and then queue send the acknowledgement back saying they have received the data so I wait for X amount of time before checking whether they have received the data or not. Below is the code which does that and it works:
public boolean send(final long address, final byte[] records, final Socket socket) {
boolean sent = sendAsync(address, records, socket, true);
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(800);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !acknowledgementCache.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromRetryBucket(address);
return sent;
}
Now problem with above code is - I wait for 800 milliseconds
no matter what and that is wrong. It could be possible acknowledgement came back in 100 milliseconds but I still wait for 800 so I want to return as soon as the acknowledgement came back instead of waiting for that X amount of time.
So I came up with below code which uses awaitility but for some reason it's not working as expected. Meaning, even though acknowledgement came back fast, it still timeout. I tried increasing value of timeout to very high number as well and still it times out so something is wrong looks like. Is there any better way to do this?
public boolean send(final long address, final byte[] records, final Socket socket) {
boolean sent = sendAsync(address, records, socket, true);
if (sent) {
try {
// if key is not present, then acknowledgement was received successfully
Awaitility.await().atMost(800, TimeUnit.MILLISECONDS)
.untilTrue(new AtomicBoolean(!acknowledgementCache.asMap().containsKey(address)));
return true;
} catch (ConditionTimeoutException ex) {
}
}
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
removeFromRetryBucket(address);
return false;
}
Note: I am working with Java 7 as of now. I do have access to Guava so if anything better is there other than awaitility then I can use that as well.
To be able to check that in Java 7 you need to write a callable.
@Test
public void send() {
//when
boolean sent = sendAsync(address, records, socket, true);
//then
if (sent) {
await().until(receivedPackageCount(), equalTo(false));
}
}
private Callable receivedPackageCount(String address) {
return new Callable() {
@Override
public boolean call() throws Exception {
return acknowledgementCache.asMap().containsKey(address);
}
};
}
It must be something similar above. There can be compilation errors because I wrote it without ide.