1

I need to process large stream of JPA entities coming from Spring Data JPA repository. Streaming works perfectly but this requires transaction. As I need other operations to start based on resulting stream elements I need to do processing outside of transaction.

My idea is:

  1. Start sub-thread.
  2. Inside this thread under the transaction start accepting stream and put elements into some queue.
  3. In main thread monitor both queue and running sub-thread.
  4. Consume elements from queue doing processing.
  5. When sub-thread finished streaming transaction process the rest of the queue and go ahead.

Is there any recommended 'standard' approach how to do so? I do not see any out-of-the-box solution. For me it looks very strange but this is what I see.

Roman Nikitchenko
  • 12,800
  • 7
  • 74
  • 110
  • Well, the first thing that pops into my head is to publish each stream element to a JMS queue as an `ObjectMessage`. Since you're saying the stream is going to be large, the gotcha you're going to face is how to avoid a long-running transaction vs an out-of-memory error. You'll probably want persistent messages/disk spooling along with limits for JMS destination memory usage. – crizzis Jan 05 '18 at 12:31
  • Why not use spring batch processing along asynchronous processing. – Yogi Jan 05 '18 at 13:18
  • @crizzis I'd really want to avoid persistence here. Just process stream as it comes out of transaction :-). @Yogi Now it seems I have solved the task and approach started to work. But I will really check `Spring Batch` for a future. – Roman Nikitchenko Jan 05 '18 at 15:17
  • I'm not sure what you mean by 'avoid persistence'. The way I read your question is: 'I want to process large query results asynchronously, one by one, without reading them into memory all at once. What's the standard approach?'. The answer was 'use JMS, it will give you configurable queue management out of the box'. The solution that you suggested is based on *blocking* operations, will not scale and, worst of all, leaves you with a long-running transaction, because the producer *blocks* on the queue. – crizzis Jan 05 '18 at 16:14
  • Alternatively, if you don't care about long-running transactions, just use spring events: http://www.baeldung.com/spring-events. No need to reinvent the wheel – crizzis Jan 05 '18 at 16:19
  • OK, in fact it happened my option is only one big read-write transaction with careful entity management including manual flushed. This is because it was part of database migration (FlyWay) covered by its transaction. Multi-threading is definitely not welcome here and I did some investigation to prove that memory usage pattern is scalable in case of meaningful approach. – Roman Nikitchenko Jan 11 '18 at 15:07
  • Still solution that I have provided as an answer solves original question. The only thing is question changed as soon as I have received answer :-). – Roman Nikitchenko Jan 11 '18 at 15:08

1 Answers1

0

OK, this happened to be thing that solves the task with some variations. Conceptual running code is below.

  1. Producer is added with @Transactional and consumes stream. Then stream elements go into queue. Good thing now I need only ReadOnly transaction.
  2. Consumer is out of the read-only transaction doing normal processing things.
  3. Code is actually watching both of them to properly synchronise.

Original code was taken from here: Producer/Consumer threads using a Queue

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.Optional;

public class Main {

    static class Producer implements Runnable {

        private final BlockingQueue<Optional<Integer>> sharedQueue;
        private int threadNo;

        public Producer(BlockingQueue<Optional<Integer>> sharedQueue, int threadNo) {
            this.threadNo = threadNo;
            this.sharedQueue = sharedQueue;
        }

        @Override
        public void run() {
            for(int i=1; i<= 5; i++){
                try {
                    Integer number = i+(10*threadNo);
                    System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                    sharedQueue.put(Optional.of(number));
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
            // Signal consumer to stop.
            try {
                sharedQueue.put(Optional.empty());
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {

        private final BlockingQueue<Optional<Integer>> sharedQueue;
        private int threadNo;

        public Consumer (BlockingQueue<Optional<Integer>> sharedQueue,int threadNo) {
            this.sharedQueue = sharedQueue;
            this.threadNo = threadNo;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Optional<Integer> num = sharedQueue.take();
                    if (!num.isPresent()) {
                        System.out.println("Consumed TERMINATOR");
                        break;
                    }
                    Thread.sleep(100);
                    System.out.println("Consumed: " + num.get() + ":by thread:" + threadNo);
                } catch (Exception err) {
                   err.printStackTrace();
                }
            }
        }
    }

    private static void shutdownService(final ExecutorService es) {
        es.shutdown();
        try {
            if (!es.awaitTermination(5, TimeUnit.SECONDS)) {
                es.shutdownNow();
            }
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(final String args[]) {

        BlockingQueue<Optional<Integer>> sharedQueue = new LinkedBlockingQueue<Optional<Integer>>();

        ExecutorService pes = Executors.newFixedThreadPool(1);
        ExecutorService ces = Executors.newFixedThreadPool(1);

        Future consumed = ces.submit(new Consumer(sharedQueue, 1));
        Future produced = pes.submit(new Producer(sharedQueue, 1));

        System.out.println("Submitted...");
        try {
            produced.get();
            System.out.println("Producer finished...");
            consumed.get();
            System.out.println("Consumer finished...");
        } catch (final Exception e) {
            throw new RuntimeException(e);
        } finally {
            shutdownService(ces);
            shutdownService(pes);
        }
        System.out.println("Done...");
    }

}
Roman Nikitchenko
  • 12,800
  • 7
  • 74
  • 110
  • In reality I have dropped this approach in favour of more aggressive entity management under single big transaction. The reason for this was need to serve FlyWay migration which is one crazy big transaction and you cannot do anything with it. But original question was really addressed with this code. – Roman Nikitchenko Jan 11 '18 at 15:04