Say I have a delayQueue as follows:
DelayQueue<DelayedObject> queue= new DelayQueue<>();
I would like to get the following behavior:
while (true) {
DealyedObject next = queue.take();
System.out.println("next message:" + next);
}
but with streams as follows:
queue.stream().forEach(e -> System.out.println(e));
However, I notice that this doesn't respect the delay and it also appears to be peeking and not taking. Is this expected behavior? Is there a better way to accomplish what I need with streams?
Here is a fully reproducible sample:
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = this.startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Math.toIntExact(this.startTime - ((DelayObject) o).startTime);
}
public String toString(){
return this.data;
}
}
public void startLoop() throws InterruptedException {
BlockingQueue<DelayObject> delayQueue= new DelayQueue<>();
DelayObject object1 = new DelayObject("Object1ExpiresIn1MS",1);
DelayObject object2 = new DelayObject("Object1ExpiresIn10000MS",10000);
delayQueue.add(object1);
delayQueue.add(object2);
while(!delayQueue.isEmpty()){
System.out.println(delayQueue.take());
}
}
public void startStream() throws InterruptedException {
BlockingQueue<DelayObject> delayQueue= new DelayQueue<>();
DelayObject object1 = new DelayObject("Object1ExpiresIn1MS",1);
DelayObject object2 = new DelayObject("Object1ExpiresIn10000MS",10000);
delayQueue.add(object1);
delayQueue.add(object2);
delayQueue.forEach(o->System.out.println(o));
}
public static void main(String[] args) throws InterruptedException {
DelayQueueExample ex1 = new DelayQueueExample();
ex1.startLoop(); //Immediately outputs Object1ExpiresIn1ms and Object1ExpiresIn10000MS after 10ms
ex1.startStream(); //Immediately outputs Object1ExpiresIn1ms,Object1ExpiresIn10000MS
}
}