I have a data migration service that reads stuff from one database in chunks then migrates it into another database.
To work through the 'Chunks' of data- I am trying to use a RecursiveAction and fork join pool. There reason for this is I would like to work to execute the work on these "chunks" in parallel, then get another chunk, then execute, then get another chunk, etc.
What's happening is my process just stops. I see no exceptions in the logs, and I see no deadlocked threads. My code is below, my questions are:
- Am I missing something in my Worker? Do I need to return some value or call some method to free up a resource?
- Is it really dumb for me to be using a RecursiveAction here? Should I be using something different to parallelize chunks of work?
- I have one worker thread in my thread dump who seems to just be waiting- is this normal or some indicator of my problem?
ForkJoinPool-1-worker-18 id=12191 state=WAITING - waiting on <0x1b5ca93e> (a java.util.concurrent.ForkJoinPool) - locked <0x1b5ca93e> (a java.util.concurrent.ForkJoinPool) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.ForkJoinPool.tryAwaitWork(ForkJoinPool.java:864) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:647) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
Code:
@Component
public class BulkMigrationService {
final ForkJoinPool pool = new ForkJoinPool();
private static final Logger log = LoggerFactory.getLogger(BulkMigrationService.class);
private SourceDataApi api;
private final Migrator migrator;
private MetadataService metadataService;
@Autowired
public BulkMigrationService(SourceDataApi api, Migrator migrator, MetadataService metadataService) {
this.api = api;
this.migrator = migrator;
this.metadataService = metadataService;
}
public void migrate(Integer batchSize, Long max) throws MigrationException {
Long currentCount = 0l;
Integer currentIndex = 0;
while (currentCount < max) {
List<String> itemsToMigrate = api.findItemRange(currentIndex, currentIndex + batchSize);
if (assetsToMigrate.size() > 0) {
MigrateForkedWorker starter = new MigrateForkedWorker(assetsToMigrate);
pool.invoke(starter);
}
currentCount += assetsToMigrate.size();
currentIndex += batchSize - 1;
if (log.isDebugEnabled()) {
log.debug("Migrated " + currentCount + " Items.");
}
}
}
public class MigrateForkedWorker extends RecursiveAction {
private int max = 10;
private List<String> allItems;
public MigrateForkedWorker(List<String> allItems) {
this.allItems = allItems;
}
@Override
protected void compute() {
if (allItems.size() <= max) {
for (String itemInfo : allItems) {
try {
migrator.migrateAsset(itemInfo);
}
catch (MigrationException e) {
e.printStackTrace();
}
}
}
else {
int targetSize = allItems.size() % 2 == 0 ? allItems.size() / 2 : (allItems.size() + 1) / 2;
List<List<String>> splits = Lists.partition(allItems, targetSize);
MigrateForkedWorker migrateForkedWorkerOne = new MigrateForkedWorker(splits.get(0));
MigrateForkedWorker migrateForkedWorkerTwo = new MigrateForkedWorker(splits.get(1));
invokeAll(migrateForkedWorkerOne, migrateForkedWorkerTwo);
}
}
}
}