2

I am trying to load a database into memory quickly using CompleteableFutures. I start the Spring transaction at the method level:

@Transactional()
    private void loadErUp() {
        StopWatch sw = StopWatch.createStarted();
        List<CompletableFuture<Void>> calls = new ArrayList<>();
        final ZonedDateTime zdt = ZonedDateTime.now(ZoneId.of(ZoneOffset.UTC.getId())).minusMinutes(REFRESH_OVERLAP);

        for (long i = 1; i < 12 + 1; i++) {
            Long holder = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                this.loadPartition(holder, zdt);
            }, this.forkJoinPool);
            calls.add(future);
        }
        CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
        log.info("All data refreshed in ({}ms) since:{}", sw.getTime(), zdt.format(DateTimeFormatter.ISO_INSTANT));
    }

And then attach each thread to the main transaction via

TransactionSynchronizationManager.setActualTransactionActive(true);

private <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        log.debug("Refresh thread start:{}", partitionKey);
        TransactionSynchronizationManager.setActualTransactionActive(true);
        StopWatch sw = StopWatch.createStarted();

        try (Stream<Authority> authorityStream = aSqlRepo.findByPartitionKeyAndLastUpdatedTimeStampAfter(partitionKey, zdt)) {
            long count = authorityStream.peek(a -> {
                this.authorityRepository.set(this.GSON.fromJson(a.getJsonData(), AssetAuthority.class));
            }).count();
            log.info("Partition {} refreshed in ({}ms) with {} items.", partitionKey, sw.getTime(), count);
            return count;
        }
    }

So I run this batch job every 30 seconds and in the 9th run I get 4 threads and then it hangs (12*8 runs = 96) because it is waiting for a pool to come open. I get:

Unable to acquire JDBC Connection; Unable to fetch a connection in 30 seconds, none available[size:100; busy:100; idle:0; lastwait:30000].

So obviously the connections are not committing. I thought it might be because I have my own ForkJoinPool, however, I shutdown all those threads and it didn't seem to help. I have also put an other method under the loadPartition() method but that didn't seem to help either. There is another thread that talks about how to get the transactions to work, but mine work, they just don't commit.

markthegrea
  • 3,731
  • 7
  • 55
  • 78
  • Well, .. I guess Spring uses ThreadLocalPattern for Transaction/ConnctionPool handling and by using ForkJoin Pool by your own, .. nobody cleans up. I would suggest to use the Spring `@Async` annotation in combination with `CompletableFuture` See https://stackoverflow.com/questions/47351435/spring-async-with-completablefuture – Peter Dec 10 '18 at 17:14
  • Tried changing to Spring managing the futures. No joy. – markthegrea Dec 10 '18 at 17:45
  • 1
    You cannot have a single transaction span multiple threads. That is simply not going to work. Also you aren't synchronizing anything with the call in your method. Basically your setup works without a transaction as `@Transactional` on a `private` method is (unless you use AspectJ compile or load-time weaving) ignored. – M. Deinum Dec 11 '18 at 07:31
  • It must be working somehow. The database call is a Stream and won't work without a transaction being open. Any suggestions? – markthegrea Dec 11 '18 at 13:54

1 Answers1

1

If you want to have each #loadPartition run on it's own thread and in it's own transaction, you'll need to:

  1. Mark #loadPartition as @Transactional
  2. Invoke the proxied #loadPartition method so that the @Transactional works. You can do this either by self-autowiring or calling the method from another proxied class

The transaction is not getting propagated to the asynchronous threads because (important!) that method is not getting proxied.

So it will look like:

@Component
public class MyLoaderClass {

    // Autowire in this with constructor injection or @Autowired
    private MyLoaderClass myLoaderClass;

    // Removed @Transactional annotation
    public void loadErUp() {
        myLoaderClass.loadPartition(holder, zdt);
        ...
    }

    // 1) Add the @Transactional annotation to #loadPartition
    // 2) Make public to use self-autowiring (or refactored class, per link above)
    @Transactional
    public <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        ...
        // Can remove TransactionSyncManager call
        ...
    }

}

You'll also want to make sure your batch job doesn't run without making sure the last job completed. You can easily solve this by using the @Scheduled annotation for your table load to ensure the runs don't "overlap".

Dovmo
  • 8,121
  • 3
  • 30
  • 44
  • You are just single threading this with one big SQL call. I want to load a table using multiple threads. The reason we are "partitioning" the sql table is so we can load it faster. I want to open 12 streams on 12 threads and read the database. Will spring handle this? – markthegrea Dec 15 '18 at 11:39
  • Revised my answer entirely. This should be more what you;re looking to do – Dovmo Dec 17 '18 at 04:59
  • Thanks for your help! I was calling the Transactional method from the same class and that was the problem, as you pointed out. I am not sure how I feel about having an instance of the class autowired into the same class, but it does make the code simpler! Clever code! – markthegrea Dec 17 '18 at 14:36