1

I have a code where I send data to our queues and then queue send the acknowledgement back saying they have received the data so I wait for X amount of time before checking whether they have received the data or not. Below is the code which does that and it works:

  public boolean send(final long address, final byte[] records, final Socket socket) {
    boolean sent = sendAsync(address, records, socket, true);
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(800);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !acknowledgementCache.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromRetryBucket(address);
    return sent;
  }

Now problem with above code is - I wait for 800 milliseconds no matter what and that is wrong. It could be possible acknowledgement came back in 100 milliseconds but I still wait for 800 so I want to return as soon as the acknowledgement came back instead of waiting for that X amount of time.

So I came up with below code which uses awaitility but for some reason it's not working as expected. Meaning, even though acknowledgement came back fast, it still timeout. I tried increasing value of timeout to very high number as well and still it times out so something is wrong looks like. Is there any better way to do this?

  public boolean send(final long address, final byte[] records, final Socket socket) {
    boolean sent = sendAsync(address, records, socket, true);
    if (sent) {
      try {
        // if key is not present, then acknowledgement was received successfully
        Awaitility.await().atMost(800, TimeUnit.MILLISECONDS)
            .untilTrue(new AtomicBoolean(!acknowledgementCache.asMap().containsKey(address)));
        return true;
      } catch (ConditionTimeoutException ex) {
      }
    }
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    removeFromRetryBucket(address);
    return false;
  }

Note: I am working with Java 7 as of now. I do have access to Guava so if anything better is there other than awaitility then I can use that as well.

flash
  • 1,455
  • 11
  • 61
  • 132
  • 1
    From what I gather, this could be caused by your condition only being checked once. You're creating the AtomicBoolean as false, and this value is never re-evaluated. I don't know awaitability but you might want to look into a lambda expression as a check or their thing about polling https://github.com/awaitility/awaitility/wiki/Usage#polling – Ferdz Feb 20 '20 at 21:43
  • Also you can look at this question, which looks like very similar https://stackoverflow.com/a/25325830/2535257 – Ferdz Feb 20 '20 at 21:45
  • @Ferdz That link is the first thing I checked and started using awaitility. I am gonna check the poll thing now. – flash Feb 20 '20 at 21:46
  • please have a close look at the answer. Their usage of `until` differs from yours because they are providing the library with a lambda. This allows the library to re-run the check. In your case, you are using `untilTrue` with a value that never change – Ferdz Feb 20 '20 at 21:49
  • why don't you move the check if its been sent into the same thread? That way you can check it straight after rather than wait an arbritary time on another thread – Blundell Feb 20 '20 at 21:50
  • @Ferdz so that means either I can use `until` with a callable or use the `polling`? Bcoz looks like `until` also uses polling underneath so for me either of them will be fine. right? – flash Feb 20 '20 at 21:54

1 Answers1

2

To be able to check that in Java 7 you need to write a callable.

@Test
public void send() {
    //when
    boolean sent = sendAsync(address, records, socket, true);
    //then
    if (sent) {
        await().until(receivedPackageCount(), equalTo(false));
    }
}

private Callable receivedPackageCount(String address) {
    return new Callable() {
        @Override
        public boolean call() throws Exception {
            return acknowledgementCache.asMap().containsKey(address);
        }
    };
}

It must be something similar above. There can be compilation errors because I wrote it without ide.

Ömer Faruk AK
  • 2,409
  • 5
  • 26
  • 47