I'm trying to implement a download queue using IntentService
which is backed by a synchronised ArrayList
. The actual download is performed by class Downloader
which is a subclass of AsyncTask
. However iteration of the downloadQueue
always fails when the first download in the queue is completed. Here's my attempt at synchronisation.
public class DownloadIntentService extends IntentService {
............
............
private final List<Downloader> downloadQueue
= Collections.synchronizedList(new ArrayList<Downloader>());
public DownloadIntentService() {
super(TAG);
}
............
............
@Override
protected void onHandleIntent(Intent intent) {
if (intent != null) {
final String action = intent.getAction();
if (ACTION_DOWNLOAD.equals(action)) {
.............
.............
handleActionAddDownload(id, url, title, videoId, formatId, extension, thumb);
} else if (ACTION_PAUSE_DOWNLOAD.equals(action)) {
final String[] ids = intent.getStringArrayExtra(EXTRA_ID);
handleActionBaz(ids);
}
}
}
/**
* Handle action Foo in the provided background thread with the provided
* parameters.
*/
private void handleActionAddDownload(final String id,
final String url,
final String title,
final String videoId,
final String formatId,
final String extension,
final String thumb) {
..............
..............
resumeDownload(id);
realm.close();
}
/**
* Takes a download id, adds it to the queue if possible and resumes the download.
*
* @param id The id.
*/
private void resumeDownload(String id) {
//noinspection StatementWithEmptyBody
if (isQueueOpen()) {
// URL already exists, try to resume it.
// FIXME: 21/8/16
if (!isInQueue(id)) {
synchronized (downloadQueue) {
downloadQueue.add(new Downloader(id));
}
} else {
// Download is already there in the queue. Multiple clicks?
Log.e(TAG, "resumeDownload: Download is already there in the queue. " +
"Multiple clicks?");
}
if (!ensureSdCard()) {
Log.e(TAG, "resumeDownload: " + "Cannot ensure sdcard" );
Toast.makeText(this, R.string.sd_not_available,
Toast.LENGTH_LONG).show();
return;
}
checkDownloadQueue();
} else {
// do nothing.
}
}
/**
* Check the database and clean queue, add downloads to queue if possible, start downloads in
* queue if not already started.
*/
private void checkDownloadQueue() {
Realm realm = Realm.getDefaultInstance();
// Clean queue
synchronized (downloadQueue) {
for (Iterator<Downloader> it = downloadQueue.iterator(); it.hasNext();) {
Downloader downloader = it.next();
final Download download = Utils.getDownloadById(realm, downloader.getId());
switch (download.getStatus()) {
case Download.DONE:
case Download.FAILED:
case Download.PAUSED:
// Remove processed downloads
if (!downloader.isCancelled()) {
downloader.cancel(true);
}
downloadQueue.remove(downloader);
break;
case Download.WAITING_FOR_QUEUE:
// Start pending downloads.
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
download.setStatus(Download.DOWNLOADING);
realm.copyToRealmOrUpdate(download);
}
});
startDownloaderInQueue(downloader.getId());
break;
case Download.DOWNLOADING:
if (downloader.getStatus().equals(AsyncTask.Status.PENDING)) {
startDownloaderInQueue(downloader.getId());
} else if (downloader.getStatus().equals(AsyncTask.Status.FINISHED)) {
if (!downloader.isCancelled()) {
downloader.cancel(true);
}
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
download.setStatus(Download.FAILED);
realm.copyToRealmOrUpdate(download);
}
});
downloadQueue.remove(downloader);
} else {
// Do nothing, task is running.
}
break;
}
}
// Add new downloads to queue if any.
if (isQueueOpen()) {
int diff = QUEUE_SIZE - downloadQueue.size();
RealmResults<Download> downloads = realm
.where(Download.class)
.findAll()
.sort("position", Sort.ASCENDING);
for (int i = 0; (diff > 0 && i < downloads.size()); i++) {
final Download download = downloads.get(i);
if (download.getStatus() == Download.WAITING_FOR_QUEUE) {
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
download.setStatus(Download.DOWNLOADING);
realm.copyToRealmOrUpdate(download);
}
});
Downloader downloader = new Downloader(download.getId());
downloadQueue.add(downloader);
startDownloaderInQueue(download.getId());
diff--;
} else if (download.getStatus() == Download.DOWNLOADING) {
if (isInQueue(download.getId())) {
Downloader downloader = new Downloader(download.getId());
downloadQueue.add(downloader);
startDownloaderInQueue(download.getId());
diff--;
}
}
}
}
}
realm.close();
}
/**
* Start a download in queue
*
* @param id an id that exists in download queue.
*/
private void startDownloaderInQueue(@NonNull String id) {
if (!isInQueue(id)) throw new IllegalArgumentException("URL is not in queue");
Downloader downloader = getDownloaderFromQueue(id);
Log.d(TAG, "startDownloaderInQueue: " + downloader.getStatus());
if (AsyncTask.Status.PENDING.equals(downloader.getStatus())) {
downloader.execute();
} else if (AsyncTask.Status.FINISHED.equals(downloader.getStatus())) {
throw new IllegalStateException("Downloader already executed");
}
}
private Downloader getDownloaderFromQueue(String id) {
synchronized (downloadQueue) {
for (Downloader downloader : downloadQueue) {
if (id.equals(downloader.getId())) return downloader;
}
}
throw new IllegalArgumentException(id + "wasn't found in queue.");
}
/**
* Check if a url is in the queue.
*
* @param id The id.
* @return weather a id is in the queue or not.
*/
private boolean isInQueue(@NonNull String id) {
synchronized (downloadQueue) {
for (Downloader downloader : downloadQueue) {
if (id.equals(downloader.getId()))
return true;
}
}
return false;
}
private boolean isQueueOpen() {
synchronized (downloadQueue) {
return downloadQueue.size() < QUEUE_SIZE;
}
}
private class Downloader extends BaseDownloader {
public Downloader(String id) {
super(id);
}
@Override
protected void onPostExecute(final Integer status) {
super.onPostExecute(status);
Realm realm = Realm.getDefaultInstance();
final Download download = Utils.getDownloadById(realm, getId());
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
download.setStatus(status);
realm.copyToRealmOrUpdate(download);
}
});
realm.close();
checkDownloadQueue();
}
@Override
protected void onProgressUpdate(DownloadProgress... values) {
super.onProgressUpdate(values);
Realm realm = Realm.getDefaultInstance();
final DownloadProgress progress = values[0];
final Download download = Utils.getDownloadById(realm, getId());
Log.d(TAG, "onProgressUpdate: " + progress);
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
download.updateProgress(progress);
realm.copyToRealmOrUpdate(download);
}
});
realm.close();
}
}
}
This is the error I'm getting:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.next(ArrayList.java:831)
at com.vibhinna.sreni.DownloadIntentService.checkDownloadQueue(DownloadIntentService.java:261)
at com.vibhinna.sreni.DownloadIntentService.access$000(DownloadIntentService.java:35)
at com.vibhinna.sreni.DownloadIntentService$Downloader.onPostExecute(DownloadIntentService.java:420)
at com.vibhinna.sreni.DownloadIntentService$Downloader.onPostExecute(DownloadIntentService.java:402)
at android.os.AsyncTask.finish(AsyncTask.java:660)
at android.os.AsyncTask.-wrap1(AsyncTask.java)
at android.os.AsyncTask$InternalHandler.handleMessage(AsyncTask.java:677)
at android.os.Handler.dispatchMessage(Handler.java:102)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6044)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:865)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:755)
What am I doing wrong? I'm not sure if its legal to call a method that contains a synchronised block from a synchronised block.
Complete class source : http://pastebin.com/aijT62Yx