4

Scenario:

  1. In my application, there are 3 processes which are copying documents on a shared drive in their respective folders.
  2. As soon as any document is copied on shared drive (by any process), directory watcher (Java) code picks up the document and call the Python script using "Process" and do some processing on the document. code snippet is as follows:

    Process pr = Runtime.getRuntime().exec(pythonCommand);
                // retrieve output from python script
                BufferedReader bfr = new BufferedReader(new InputStreamReader(pr.getInputStream()));
                String line = "";
                while ((line = bfr.readLine()) != null) {
                    // display each output line from python script
                    logger.info(line);
                }
                pr.waitFor();
    
  3. Currently my code waits till python code execution is completed on the document. Only after that it pick up the next document. Python code takes 30 secs to complete.

  4. After processing the document, document is moved from the current folder to archive OR error folder.
  5. Please find below screen shot of the scenario: enter image description here

What is the problem?

  1. My code is processing documents in sequential manner and I need to process the document in parallel.
  2. As Python code takes around 30 seconds, some of the events created by directory watcher are also getting lost.
  3. If around 400 documents are coming within a short span of time, document processing stops.

What I am looking for?

  1. Design solution for processing documents in parallel.
  2. In case of any failure scenario for document processing, pending documents must be processed automatically.
  3. I tried spring boot schedular as well but still documents are getting processed in sequential manner only.
  4. Is it possible to call the Python code in parallel as a background process.

Sorry for the long question but I am stuck at this from many days and already looked many similar questions. Thank you!

Anurag
  • 843
  • 1
  • 9
  • 16

3 Answers3

2

One option would be to use a ExecutorService provided by the JDK, which can execute Runnable and Callable tasks. You will need to create a class that implements Runnable, which will execute your Python script, and after receiving a new document, you need to create a new instance of this class and pass it to the ExecutorService.

To show how this works, we will use a simple Python script that takes a thread name as an argument, prints the start time of its execution, sleeps 10 seconds and prints the end time:

import time
import sys

print "%s start : %s" % (sys.argv[1], time.ctime())
time.sleep(10)
print "%s end : %s" % (sys.argv[1], time.ctime())

First, we implement the class that runs the script and passes it the name obtained in the constructor:

class ScriptRunner implements Runnable {

    private String thread;

    ScriptRunner(String thread) {
        this.thread = thread;
    }

    @Override
    public void run() {
        try {
            ProcessBuilder ps = new ProcessBuilder("py", "test.py", thread);
            ps.redirectErrorStream(true);
            Process pr = ps.start();
            try (BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()))) {
                String line;
                while ((line = in.readLine()) != null) {
                    System.out.println(line);
                }
            }
            pr.waitFor();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Then we create main method that creates ExecutorService with a fixed number of parallel threads in the amount of 5 and pass 10 instances of ScriptRunner to it with interruptions of 1 second:

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(5);
    for (int i = 1; i <= 10; i++) {
        executor.submit(new ScriptRunner("Thread_" + i));
        Thread.sleep(1000);
    }
    executor.shutdown();
}

If we run this method, we will see that the service, due to the specified limit, has a maximum of 5 parallel-running tasks, and the rest fall into the queue and start in freed threads:

Thread_1 start : Sat Nov 23 11:40:14 2019
Thread_1 end : Sat Nov 23 11:40:24 2019    // the first task is completed..
Thread_2 start : Sat Nov 23 11:40:15 2019
...
Thread_5 end : Sat Nov 23 11:40:28 2019
Thread_6 start : Sat Nov 23 11:40:24 2019  // ..and the sixth is started
...
Thread_10 end : Sat Nov 23 11:40:38 2019
Anar Sultanov
  • 3,016
  • 2
  • 17
  • 27
  • Thank you very much for your help! Initially, I was getting error but finally I was able to resolve the error. This worked for me. – Anurag Nov 26 '19 at 04:51
  • 1
    FYI - I was getting error only in Python script, in PyCharm, "%s" was not getting recognized. So I used following script and after that this worked: import datetime import time startTime = datetime.datetime.now() print("start =", startTime) time.sleep(10) endTime = datetime.datetime.now() print("end ===", endTime) – Anurag Nov 26 '19 at 08:22
1

You can try the multi pocesssing module in python here

Because of the GIL, Python's threading will not speed-up computations that are CPU bound.

Possible duplicate of this question Solving embarassingly parallel problems using Python multiprocessing

asing177
  • 934
  • 2
  • 13
  • 34
0
  • Create two queues (Blocking Queue):
    executionQueue
    errorQueue

  • Creat two thread(You can create as many as you want based on your need):
    FirstThread
    SecondThread

  • Concept:
    Producer-Consumer

  • Producer(Directory watcher Thread):
    Directory watcher

  • Consumer:
    FirstThread
    SecondThread

Details:

  • The addition and deletion method of both the queue must be synchronized. A single moment only one thread will access that method.If one thread is accessing the critical area(producer or consumer) the rest of the thread will wait for their turn.
  • First Producer will start working and initially, the consumer is in the sleeping stage.

    Why? to synchronously run the whole system.

    How you will get it? Sleep producer thread after processing and in case of Consumer sleep at the starting of job.

  • The first producer or consumer will acquire the lock in the queue, process the work and release it. In between, if any thread (producer or consumer) comes to fetch the data, they will wait for their turn(using the concept of a Thread pool).

  • As soon as any document is copied on a shared drive(by any process), the directory watcher(Producer) code picks up the path of that document and store in executionQueue synchronously.

  • Now Consumer will come to fetch the data, FirstThread wakes up first and goes to fetch data from executionQueue. FirstThread will acquire the lock-in executionQueue and then fetch the data and release lock in it. If in between SecondThread come to fetch the data it will wait for his turn.

  • After fetching the data from executionQueue FirstThread will pick up the document from location and call the Python script with the fetched document.

  • In between SecondThread will acquire the lock and fetch the path and start processing the same concept as FirstThread.

  • After a few seconds later FirstThread will finish his job and then it will go to the executionQueue and again acquire the lock and fetch the file path and release the lock and start processing the same work and rest the same for SecondThread too...

  • In the processing of that file if any error is occurred then send that path info to the errorQueue method and analysis that errorQueue information at day end or when your system is free using the same concept or manually.

  • If no data is available in executionQueue, at that moment producer threads(Directory watcher) are already in the sleeping stage. Then consumer thread will come to executionQueue to fetch the data, they will not get any data and goto to the sleeping stage like 1 minute, after 1 minute again it will wake up and go to fetch the data and so on...

  • In each step log, the information will help you for better analysis.

  • Using that concept you can run the whole system parallel.

Tapan
  • 285
  • 2
  • 8