0

Say I have a delayQueue as follows:

DelayQueue<DelayedObject> queue= new DelayQueue<>();

I would like to get the following behavior:

while (true) {
DealyedObject next = queue.take();
System.out.println("next message:" + next);
}

but with streams as follows:

queue.stream().forEach(e -> System.out.println(e));

However, I notice that this doesn't respect the delay and it also appears to be peeking and not taking. Is this expected behavior? Is there a better way to accomplish what I need with streams?

Here is a fully reproducible sample:



import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {
    public class DelayObject implements Delayed {
        private String data;
        private long startTime;

        public DelayObject(String data, long delayInMilliseconds) {
            this.data = data;
            this.startTime = System.currentTimeMillis() + delayInMilliseconds;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long diff = this.startTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Math.toIntExact(this.startTime - ((DelayObject) o).startTime);
        }

        public String toString(){
            return this.data;
        }
    }

    public void startLoop() throws InterruptedException {
        BlockingQueue<DelayObject> delayQueue= new DelayQueue<>();
        DelayObject object1 = new DelayObject("Object1ExpiresIn1MS",1);
        DelayObject object2 = new DelayObject("Object1ExpiresIn10000MS",10000);
        delayQueue.add(object1);
        delayQueue.add(object2);

        while(!delayQueue.isEmpty()){
            System.out.println(delayQueue.take());
        }


    }

    public void startStream() throws InterruptedException {
        BlockingQueue<DelayObject> delayQueue= new DelayQueue<>();
        DelayObject object1 = new DelayObject("Object1ExpiresIn1MS",1);
        DelayObject object2 = new DelayObject("Object1ExpiresIn10000MS",10000);
        delayQueue.add(object1);
        delayQueue.add(object2);

        delayQueue.forEach(o->System.out.println(o));


    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueueExample ex1 = new DelayQueueExample();
        ex1.startLoop();      //Immediately outputs Object1ExpiresIn1ms and Object1ExpiresIn10000MS after 10ms
        ex1.startStream();    //Immediately outputs Object1ExpiresIn1ms,Object1ExpiresIn10000MS
    }
}

TKTheTechie
  • 73
  • 1
  • 1
  • 7
  • Does this answer your question? [Stream API and Queues: Subscribe to BlockingQueue stream-style](https://stackoverflow.com/questions/23462209/stream-api-and-queues-subscribe-to-blockingqueue-stream-style) – Sergey Vyacheslavovich Brunov Jan 26 '22 at 17:10
  • No it doesn't - the code there doesn't take - just peeks so the object is never removed from the queue. Also the delay is never respected either (I'm assuming because the stream is peeking and not taking) – TKTheTechie Jan 26 '22 at 18:45
  • Could you please provide a minimal reproducible example? Please, see: [How to create a Minimal, Reproducible Example - Help Center - Stack Overflow](https://stackoverflow.com/help/minimal-reproducible-example). – Sergey Vyacheslavovich Brunov Jan 26 '22 at 20:38
  • 1
    Yes sorry - done – TKTheTechie Jan 26 '22 at 22:36
  • 1
    Hmm ok, actually looking into the post you mentioned. If I use Stream.Generate() I get the behavior that I need. – TKTheTechie Jan 26 '22 at 22:53

0 Answers0