9

I have a data structure like this:

BlockingQueue mailbox = new LinkedBlockingQueue();

I'm trying to do this:

for(Mail mail: mailbox)
{
    if(badNews(mail))
    {
        mailbox.remove(mail);
    }
}

Obviously the contents of the loop interfere with the bounds and a error is triggered, so I would normally do this:

for(int i = 0;  i < mailbox.size(); i++)
{
    if(badNews(mailbox.get(i)))
    {
        mailbox.remove(i);
        i--;
    }
}

But sadly BlockingQueue's don't have a function to get or remove an element by index, so I'm stuck. Any ideas?

Edit - A few clarifications: One of my goals is the maintain the same ordering so popping from the head and putting it back into the tail is no good. Also, although no other threads will remove mail from a mailbox, they will add to it, so I don't want to be in the middle of an removal algorithm, have someone send me mail, and then have an exception occur.

Thanks in advance!

Josh
  • 635
  • 2
  • 7
  • 12

4 Answers4

3

You may p̶o̶p̶ poll and p̶u̶s̶h̶ offer all the elements in your queue until you make a complete loop over your queue. Here's an example:

Mail firstMail = mailbox.peek();
Mail currentMail = mailbox.pop();
while (true) {
    //a base condition to stop the loop
    Mail tempMail = mailbox.peek();
    if (tempMail == null || tempMail.equals(firstMail)) {
        mailbox.offer(currentMail);
        break;
    }
    //if there's nothing wrong with the current mail, then re add to mailbox
    if (!badNews(currentMail)) {
        mailbox.offer(currentMail);
    }
    currentMail = mailbox.poll();
}

Note that this approach will work only if this code is executed in a single thread and there's no other thread that removes items from this queue.

Maybe you need to check if you really want to poll or take the elements from the BlockingQueue. Similar for offer and put.

More info:


Another less buggy approach is using a temporary collection, not necessarily concurrent, and store the elements you still need in the queue. Here's a kickoff example:

List<Mail> mailListTemp = new ArrayList<>();
while (mailbox.peek() != null) {
    Mail mail = mailbox.take();
    if (!badNews(mail)) {
        mailListTemp.add(mail);
    }
}
for (Mail mail : mailListTemp) {
    mailbox.offer(mail);
}
Community
  • 1
  • 1
Luiggi Mendoza
  • 85,076
  • 16
  • 154
  • 332
  • @ScaryWombat *fixed*. Still, OP doesn't notify us if other threads remove items from the queue as well. – Luiggi Mendoza Oct 21 '14 at 04:58
  • @ScaryWombat well, I don't read any other prev question, so I lack of this context. – Luiggi Mendoza Oct 21 '14 at 04:59
  • Maybe I am missing your point, but doesn't `peek` return null if the queue is empty (non-blocking)? In you previous version, if the queue was empty before entering your loop, it would have thrown a NPE. – Scary Wombat Oct 21 '14 at 05:04
  • @Josh if those are your conditions, then yes, it will. But still you have to test it. – Luiggi Mendoza Oct 21 '14 at 05:05
  • @ScaryWombat reviewed the code after your comment and fixed my big mistake. There's no `peek` equivalent method, but after checking the source code, it applies a `ReentrantLock` before retrieving (but not removing) the head element in the queue. – Luiggi Mendoza Oct 21 '14 at 05:11
  • Ok, thanks for all the help. I'll trace though it and try to figure out how it works. – Josh Oct 21 '14 at 05:28
  • My question is why the offer statement isn't also right before the break. It looks like the last element is never being added back. – Josh Oct 21 '14 at 05:57
  • There's one other thing I see: if the first element get removed, the loop never breaks. – Josh Oct 21 '14 at 06:04
  • @Josh, this is just an example. You can adapt the code to your needs. Also, you may unmark this post as the answer. It's very late here in my country so I'll have some sleep time and rethink a proper solution for next time. Hint: you may use an additional queue (not necessarily concurrent) to store the elements you still need, then re add all the elements from this queue into the current queue. – Luiggi Mendoza Oct 21 '14 at 06:07
  • Ok, I think I made a good adaption and just posted it for review. Thank you for putting me on the right track! – Josh Oct 21 '14 at 06:11
  • @Josh answer updated providing an example of the last approach. Please let me know of any other problem, but probably I'll review it in some hours =\ – Luiggi Mendoza Oct 21 '14 at 06:14
  • Yeah, the new solution works and solves the problem in at most 2n steps. It looks really good. The one I posted below may have it with only n steps though. I'm not positive, but it seems to work. Regardless, your answer lead me to the solution so I'm marking yours correct. (And if mine is wrong, please let me know) Thanks! – Josh Oct 21 '14 at 06:28
  • Okay, after studying the new solution I see that once the first pass is made and no elements remain in the queue, another thread could add one and then when the old items are added back, they are at the back of the queue. The new element is supposed to be back there instead. I gave it another try below. Maybe this one is best. What do you think? – Josh Oct 21 '14 at 06:54
0

I looked over the solutions posted and I think I found a version that serves my purposes. What do you think about this one?

int size = mailbox.size();
for(int i = 0; i < size; i++)
{
    Mail currentMail = mailbox.poll();
    if (!badNews(currentMail))
        mailbox.offer(currentMail);
}

Edit: A new solution that may be problem free. What you guys think?

while(true)
{
    boolean badNewRemains = false;

    for(Mail mail: mailbox)
    {
        if(badNews(mail))
        {
            badNewRemains = true;
            mailbox.remove(mail);
            break;
        }
    }

    if(!badNewRemains)
        break;
}
Josh
  • 635
  • 2
  • 7
  • 12
  • The size of the queue may change since other threads are adding more mails to the queue. Be careful on this. – Luiggi Mendoza Oct 21 '14 at 06:30
  • Yeah, now I see that the elements can be out order if new data arrives in the middle. I gave it another try above. – Josh Oct 21 '14 at 06:55
0

You can easily implement queue for your need. And you will need to, if API provided doesn't have such features.

One like:

import java.util.Iterator;
import java.util.LinkedList;


class Mail {
    boolean badMail;
}

class MailQueue {
    private LinkedList<Mail> backingQueue = new LinkedList<>();
    private final Object lock = new Object();

    public void push(Mail mail){
        synchronized (lock) {
            backingQueue.addLast(mail);
            if(backingQueue.size() == 1){
                // this is only element in queue, i.e. queue was empty before, so invoke if any thread waiting for mails in queue.
                lock.notify();
            }
        }
    }

    public Mail pop() throws InterruptedException{
        synchronized (lock) {
            while(backingQueue.isEmpty()){
                // no elements in queue, wait.
                lock.wait();
            }
            return backingQueue.removeFirst();
        }
    }

    public boolean removeBadMailsInstantly() {
        synchronized (lock) {
            boolean removed = false;
            Iterator<Mail> iterator = backingQueue.iterator();

            while(iterator.hasNext()){
                Mail mail = iterator.next();
                if(mail.badMail){
                    iterator.remove();
                    removed = true;
                }
            }

            return removed;
        }
    }
}

The implemented queue will be thread-safe, whether push or pop. Also you can edit queue for more operations. And it will allow to access removeBadMailsInstantly method by multiple threads (thread-safe). And you will also learn concepts of multithreading.

nullptr
  • 3,320
  • 7
  • 35
  • 68
  • If there are many pieces of mail all using this class and 'private final Object lock = new Object();', will they block each other or will each lock somehow be unique? – Josh Oct 21 '14 at 14:46
  • @Josh Basically you have to create one instance of this MailQueue, and let all threads use this instance, so there will be only one lock, all threads using same lock. – nullptr Oct 21 '14 at 19:38
  • @Josh the threads which adds to queue will never block ever. The threads which retrieves mails from queue will block only if mail queue is empty. The thread which removes bad mails will never block. You can add your methods needed for blocking/nonblocking. – nullptr Oct 21 '14 at 19:46
0

Iterator was designed for this. Consider Iterator#remove javadoc: Removes from the underlying collection the last element returnedby this iterator (optional operation).

Iterator<?> iter = queue.iterator();
while (iter.hasNext()) {
    if (iter.next()...)
        iter.remove();
}
Mike
  • 20,010
  • 25
  • 97
  • 140