-1

I'm trying to implement a download queue using IntentService which is backed by a synchronised ArrayList. The actual download is performed by class Downloader which is a subclass of AsyncTask. However iteration of the downloadQueue always fails when the first download in the queue is completed. Here's my attempt at synchronisation.

public class DownloadIntentService extends IntentService {

............
............

    private final List<Downloader> downloadQueue
            = Collections.synchronizedList(new ArrayList<Downloader>());

    public DownloadIntentService() {
        super(TAG);
    }

............
............

    @Override
    protected void onHandleIntent(Intent intent) {
        if (intent != null) {
            final String action = intent.getAction();
            if (ACTION_DOWNLOAD.equals(action)) {
                .............
                .............

                handleActionAddDownload(id, url, title, videoId, formatId, extension, thumb);
            } else if (ACTION_PAUSE_DOWNLOAD.equals(action)) {
                final String[] ids = intent.getStringArrayExtra(EXTRA_ID);
                handleActionBaz(ids);
            }
        }
    }

    /**
     * Handle action Foo in the provided background thread with the provided
     * parameters.
     */
    private void handleActionAddDownload(final String id,
                                         final String url,
                                         final String title,
                                         final String videoId,
                                         final String formatId,
                                         final String extension,
                                         final String thumb) {
        ..............
        ..............
        resumeDownload(id);
        realm.close();
    }

    /**
     * Takes a download id, adds it to the queue if possible and resumes the download.
     *
     * @param id The id.
     */
    private void resumeDownload(String id) {
        //noinspection StatementWithEmptyBody
        if (isQueueOpen()) {
            // URL already exists, try to resume it.
            // FIXME: 21/8/16
            if (!isInQueue(id)) {
                synchronized (downloadQueue) {
                    downloadQueue.add(new Downloader(id));
                }
            } else {
                // Download is already there in the queue. Multiple clicks?
                Log.e(TAG, "resumeDownload: Download is already there in the queue. " +
                        "Multiple clicks?");

            }
            if (!ensureSdCard()) {
                Log.e(TAG, "resumeDownload: " + "Cannot ensure sdcard" );
                Toast.makeText(this, R.string.sd_not_available,
                        Toast.LENGTH_LONG).show();
                return;
            }
            checkDownloadQueue();
        } else {
            // do nothing.
        }
    }

    /**
     * Check the database and clean queue, add downloads to queue if possible, start downloads in
     * queue if not already started.
     */
    private void checkDownloadQueue() {
        Realm realm = Realm.getDefaultInstance();
        // Clean queue
        synchronized (downloadQueue) {
            for (Iterator<Downloader> it = downloadQueue.iterator(); it.hasNext();) {
                Downloader downloader = it.next();
                final Download download = Utils.getDownloadById(realm, downloader.getId());
                switch (download.getStatus()) {
                    case Download.DONE:
                    case Download.FAILED:
                    case Download.PAUSED:
                        // Remove processed downloads
                        if (!downloader.isCancelled()) {
                            downloader.cancel(true);
                        }
                        downloadQueue.remove(downloader);
                        break;
                    case Download.WAITING_FOR_QUEUE:
                        // Start pending downloads.
                        realm.executeTransaction(new Realm.Transaction() {
                            @Override
                            public void execute(Realm realm) {
                                download.setStatus(Download.DOWNLOADING);
                                realm.copyToRealmOrUpdate(download);
                            }
                        });
                        startDownloaderInQueue(downloader.getId());
                        break;
                    case Download.DOWNLOADING:
                        if (downloader.getStatus().equals(AsyncTask.Status.PENDING)) {
                            startDownloaderInQueue(downloader.getId());
                        } else if (downloader.getStatus().equals(AsyncTask.Status.FINISHED)) {
                            if (!downloader.isCancelled()) {
                                downloader.cancel(true);
                            }
                            realm.executeTransaction(new Realm.Transaction() {
                                @Override
                                public void execute(Realm realm) {
                                    download.setStatus(Download.FAILED);
                                    realm.copyToRealmOrUpdate(download);
                                }
                            });
                            downloadQueue.remove(downloader);
                        } else {
                            // Do nothing, task is running.
                        }
                        break;
                }
            }

            // Add new downloads to queue if any.
            if (isQueueOpen()) {
                int diff = QUEUE_SIZE - downloadQueue.size();
                RealmResults<Download> downloads = realm
                        .where(Download.class)
                        .findAll()
                        .sort("position", Sort.ASCENDING);
                for (int i = 0; (diff > 0 && i < downloads.size()); i++) {
                    final Download download = downloads.get(i);
                    if (download.getStatus() == Download.WAITING_FOR_QUEUE) {
                        realm.executeTransaction(new Realm.Transaction() {
                            @Override
                            public void execute(Realm realm) {
                                download.setStatus(Download.DOWNLOADING);
                                realm.copyToRealmOrUpdate(download);
                            }
                        });
                        Downloader downloader = new Downloader(download.getId());
                        downloadQueue.add(downloader);
                        startDownloaderInQueue(download.getId());
                        diff--;
                    } else if (download.getStatus() == Download.DOWNLOADING) {
                        if (isInQueue(download.getId())) {
                            Downloader downloader = new Downloader(download.getId());
                            downloadQueue.add(downloader);
                            startDownloaderInQueue(download.getId());
                            diff--;
                        }
                    }
                }
            }
        }
        realm.close();
    }

    /**
     * Start a download in queue
     *
     * @param id an id that exists in download queue.
     */
    private void startDownloaderInQueue(@NonNull String id) {
        if (!isInQueue(id)) throw new IllegalArgumentException("URL is not in queue");
        Downloader downloader = getDownloaderFromQueue(id);
        Log.d(TAG, "startDownloaderInQueue: " + downloader.getStatus());
        if (AsyncTask.Status.PENDING.equals(downloader.getStatus())) {
            downloader.execute();
        } else if (AsyncTask.Status.FINISHED.equals(downloader.getStatus())) {
            throw new IllegalStateException("Downloader already executed");
        }
    }

    private Downloader getDownloaderFromQueue(String id) {
        synchronized (downloadQueue) {
            for (Downloader downloader : downloadQueue) {
                if (id.equals(downloader.getId())) return downloader;
            }
        }
        throw new IllegalArgumentException(id + "wasn't found in queue.");
    }

    /**
     * Check if a url is in the queue.
     *
     * @param id The id.
     * @return weather a id is in the queue or not.
     */
    private boolean isInQueue(@NonNull String id) {
        synchronized (downloadQueue) {
            for (Downloader downloader : downloadQueue) {
                if (id.equals(downloader.getId()))
                    return true;
            }
        }
        return false;
    }

    private boolean isQueueOpen() {
        synchronized (downloadQueue) {
            return downloadQueue.size() < QUEUE_SIZE;
        }
    }

    private class Downloader extends BaseDownloader {
        public Downloader(String id) {
            super(id);
        }

        @Override
        protected void onPostExecute(final Integer status) {
            super.onPostExecute(status);
            Realm realm = Realm.getDefaultInstance();
            final Download download = Utils.getDownloadById(realm, getId());
            realm.executeTransaction(new Realm.Transaction() {
                @Override
                public void execute(Realm realm) {
                    download.setStatus(status);
                    realm.copyToRealmOrUpdate(download);
                }
            });
            realm.close();
            checkDownloadQueue();
        }

        @Override
        protected void onProgressUpdate(DownloadProgress... values) {
            super.onProgressUpdate(values);
            Realm realm = Realm.getDefaultInstance();
            final DownloadProgress progress = values[0];
            final Download download = Utils.getDownloadById(realm, getId());
            Log.d(TAG, "onProgressUpdate: " + progress);
            realm.executeTransaction(new Realm.Transaction() {
                @Override
                public void execute(Realm realm) {
                    download.updateProgress(progress);
                    realm.copyToRealmOrUpdate(download);
                }
            });
            realm.close();
        }
    }
}

This is the error I'm getting:

java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.next(ArrayList.java:831)
    at com.vibhinna.sreni.DownloadIntentService.checkDownloadQueue(DownloadIntentService.java:261)
    at com.vibhinna.sreni.DownloadIntentService.access$000(DownloadIntentService.java:35)
    at com.vibhinna.sreni.DownloadIntentService$Downloader.onPostExecute(DownloadIntentService.java:420)
    at com.vibhinna.sreni.DownloadIntentService$Downloader.onPostExecute(DownloadIntentService.java:402)
    at android.os.AsyncTask.finish(AsyncTask.java:660)
    at android.os.AsyncTask.-wrap1(AsyncTask.java)
    at android.os.AsyncTask$InternalHandler.handleMessage(AsyncTask.java:677)
    at android.os.Handler.dispatchMessage(Handler.java:102)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6044)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:865)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:755)

What am I doing wrong? I'm not sure if its legal to call a method that contains a synchronised block from a synchronised block.

Complete class source : http://pastebin.com/aijT62Yx

Binoy Babu
  • 16,699
  • 17
  • 91
  • 134

3 Answers3

2

You are getting the exception since you are removing an item from the underling collection (your list) while traversing it with the iterator.

Iterator.remove() is the only safe way to modify a collection during iteration; the behavior is unspecified if the underlying collection is modified in any other way while the iteration is in progress.

Nir Levy
  • 12,750
  • 3
  • 21
  • 38
1

You are modifiing the list you are currently iterating directly. This is not allowed. You must use the it.remove() function to change the list while iterating with an Iterator. If you want to modify the list during iteration, do not use the Iterator, use direct index iteration or use a copy of the list and manipulate the original one during iteration on the copy.

thst
  • 4,592
  • 1
  • 26
  • 40
1

in method checkDownloadQueue instead of using downloadQueue.remove(downloader) use iterator's remove method .

Rohit
  • 89
  • 6