2

In my app there are 2 phases, one download some big data, and the other manipulates it. so i created 2 classes which implements runnable: ImageDownloader and ImageManipulator, and they share a downloadedBlockingQueue:

        public class ImageDownloader implements Runnable {

        private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
        private ArrayBlockingQueue<String> imgUrlsBlockingQueue;

        public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) {

            this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
            this.imgUrlsBlockingQueue = imgUrlsBlockingQueue;

        }

        @Override
        public void run() {
            while (!this.imgUrlsBlockingQueue.isEmpty()) {
                try {
                    String imgUrl = this.imgUrlsBlockingQueue.take();
                    ImageBean imageBean = doYourThing(imgUrl);
                    this.downloadedImagesBlockingQueue.add(imageBean);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

        public class ImageManipulator implements Runnable {

        private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
        private AtomicInteger capacity;

        public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
                                AtomicInteger capacity) {
            this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
            this.capacity = capacity;
        }

        @Override
        public void run() {
            while (capacity.get() > 0) {
                try {
                    ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK
                    capacity.decrementAndGet();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // ....
            }
        }
    }




    public class Main {
        public static void main(String[] args) {
            String[] imageUrls = new String[]{"url1", "url2"};
            int capacity = imageUrls.length;

            ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity);
            ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity);

            ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 3; i++) {
                Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue);
                downloaderExecutor.execute(worker);
            }
            downloaderExecutor.shutdown();

            ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3);
            AtomicInteger manipulatorCapacity = new AtomicInteger(capacity);

            for (int i = 0; i < 3; i++) {
                Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity);
                manipulatorExecutor.execute(worker);
            }
            manipulatorExecutor.shutdown();
            while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) {
            }
        }
    }

The deadlock happens because this scenario: t1 checks capacity its 1.

t2 checks its 1.

t3 checks its 1.

t2 takes, sets capacity to 0, continue with flow and eventually exits. t1 and t3 now on deadlock, cause there will be no adding to the downloadedImagesBlockingQueue.

Eventually i want something like that: when the capacity is reached && the queue is empty = break the "while" loop, and terminate gracefully.

to set "is queue empty" as only condition won't work, cause in the start it is empty, until some ImageDownloader puts a imageBean into the queue.

user2212726
  • 1,225
  • 3
  • 16
  • 23

4 Answers4

1

There area a couple of things you can do to prevent deadlock:

  • Use a LinkedBlockingQueue which has a capacity
  • Use offer to add to the queue which does not block
  • Use drainTo or poll to take items from the queue which are not blocking

There are also some tips you might want to consider:

  • Use a ThreadPool:
    final ExecutorService executorService = Executors.newFixedThreadPool(4);
  • If you use a fixed size ThreadPool you can add "poison pill"s when you finished adding data to the queue corresponding to the size of your ThreadPool and check it when you poll

Using a ThreadPool is as simple as this:

    final ExecutorService executorService = Executors.newFixedThreadPool(4);

    final Future<?> result = executorService.submit(new Runnable() {
        @Override
        public void run() {

        }
    });

There is also the less known ExecutorCompletionService which abstracts this whole process. More info here.

Community
  • 1
  • 1
Adam Arold
  • 29,285
  • 22
  • 112
  • 207
0

You don't need the capacity in your consumer. It's now read and updated in multiple threads, which cause the synchronization issue.

  1. initImgUrlsBlockingQueue creates the url blocking queue with capacity number of URL items. (Right?)
  2. ImageDownloader consumes the imgUrlsBlockingQueue and produce images, it terminates when all the URLs are downloaded, or, if capacity means number of images that should be downloaded because there may be some failure, it terminates when it added capacity number of images.
  3. Before ImageDownloader terminates, it add a marker in to the downloadedImagesBlockingQueue, for example, a null element, a static final ImageBean static final ImageBean marker = new ImageBean().
  4. All ImageManipulator drains the queue use the following construct, and when it sees the null element, it add it to the queue again and terminate.

    // use identity comparison
    while ((imageBean = downloadedImagesBlockingQueue.take()) != marker) {
       // process image
    }
    downloadedImagesBlockingQueue.add(marker);
    

Note that the BlockingQueue promises its method call it atomic, however, if you check it's capacity first, and consume an element according to the capacity, the action group won't be atomic.

Arie Xiao
  • 13,909
  • 3
  • 31
  • 30
  • No BlockingQueue implementations can have null elements: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#add-E- – John H Jun 10 '16 at 00:07
  • 1
    It's just a example. As I said, a **marker** element. You can declare a static final field `static final ImageBean marker = new ImageBean()` and use this as the marker and compares using the identity comparison `while ((imageBean = downloadedImagesBlockingQueue.take()) != marker) ...` – Arie Xiao Jun 10 '16 at 00:16
0

Well i used some of the features suggested, but this is the complete solution for me, the one which does not busy waiting and wait until the Downloader notify it.

public ImageManipulator(LinkedBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
                        LinkedBlockingQueue<ImageBean> manipulatedImagesBlockingQueue,
                        AtomicInteger capacity,
                        ManipulatedData manipulatedData,
                        ReentrantLock downloaderReentrantLock,
                        ReentrantLock manipulatorReentrantLock,
                        Condition downloaderNotFull,
                        Condition manipulatorNotFull) {

    this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
    this.manipulatedImagesBlockingQueue = manipulatedImagesBlockingQueue;
    this.capacity = capacity;
    this.downloaderReentrantLock = downloaderReentrantLock;
    this.manipulatorReentrantLock = manipulatorReentrantLock;
    this.downloaderNotFull = downloaderNotFull;
    this.manipulatorNotFull = manipulatorNotFull;
    this.manipulatedData = manipulatedData;
}

@Override
public void run() {
    while (capacity.get() > 0) {
        downloaderReentrantLock.lock();
        if (capacity.get() > 0) { //checks if the value is updated.

            ImageBean imageBean = downloadedImagesBlockingQueue.poll();

            if (imageBean != null) { // will be null if no downloader finished is work (successfully downloaded or not)

                capacity.decrementAndGet();
                if (capacity.get() == 0) { //signal all the manipulators to wake up and stop waiting for downloaded images.
                    downloaderNotFull.signalAll();
                }
                downloaderReentrantLock.unlock();

                if (imageBean.getOriginalImage() != null) { // the downloader will set it null iff it failes to download it.

                     // business logic
                }

                manipulatedImagesBlockingQueue.add(imageBean);

                signalAllPersisters(); // signal the persisters (which has the same lock/unlock as this manipulator.

            } else {
                try {
                    downloaderNotFull.await(); //manipulator will wait for downloaded image - downloader will signalAllManipulators (same as signalAllPersisters() here) when an imageBean will be inserted to queue.
                    downloaderReentrantLock.unlock();
                } catch (InterruptedException e) {
                    logger.log(Level.ERROR, e.getMessage(), e);
                }
            }
        }
    }

    logger.log(Level.INFO, "Manipulator: " + Thread.currentThread().getId() + "  Ended Gracefully");
}

private void signalAllPersisters() {
    manipulatorReentrantLock.lock();
    manipulatorNotFull.signalAll();
    manipulatorReentrantLock.unlock();
}

For full flow you can check this project on my github: https://github.com/roy-key/image-service/

user2212726
  • 1,225
  • 3
  • 16
  • 23
-1

Your issue is that you are trying to use a counter to track queue elements and aren't composing operations that need to be atomic. You are doing check, take, decrement. This allows the queue size and counter to desynchronize and your threads block forever. It would be better to write a synchronization primitive that is 'closeable' so that you don't have to keep an associated counter. However, a quick fix would be to change it so you are get and decrementing the counter atomically:

while (capacity.getAndDecrement() > 0) {
    try {
        ImageBean imageBean = downloadedImagesBlockingQueue.take();

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

In this case if there are 3 threads and only one element left in the queue then only one thread will atomically decrement the counter and see that it can take without blocking. Both other threads will see 0 or <0 and break out of the loop.

You also need to make all of your class instance variables final so that they have the correct memory visibility. You should also determine how you are going to handle interrupts rather than relying on the default print trace template.

John H
  • 702
  • 3
  • 7