1

I have a program that parses reports coming from multiple devices (about 1000 devices), saves them to a DB and then does additional processing on them.

Parsing the reports can be done concurrently, but the saving to the DB and the additional processing requires some synchronization based on what device ID they come from (since it might be needed to update the same data on the DB).

So, I can run the processing in parallel as long as the threads are handling reports from different device IDs.

What could be the most efficient way to process this?

Example

I initially thought about using a thread pool and locking on the device ID, but that won't be efficient if I get a burst of reports coming from a single device.

For example, considering a thread pool with 4 threads and 10 incoming reports:

Report # DeviceID
1 A
2 A
3 A
4 A
5 A
6 B
7 C
8 D
9 E
10 F

Thread 1 would start processing A's report, thread 2-4 would wait until thread 1 finishes, and the rest of the reports would get queued.

It would be more efficient if the rest of A's reports could be queued instead, allowing B/C/D reports to be processed concurrently. Is there an efficient way to do this?

btnk
  • 11
  • 2
  • Just a suggestion and not a well-thought-out answer like those below, but you could use a large thread pool (sized to avoid system overload if all threads are running at once) and within each thread synchronize on the device ID (as an intern'd string or similar). Probably not quite as efficient as the suggested answers, but a lot simpler. – Daniel Dec 16 '20 at 19:36

2 Answers2

0

Try using a priority queue. The highest priority items in the queue would be chosen for processing by the thread pool. For example:

NOTE: I know priority queues are not typically implemented using an array and that some priority queues use smaller index values for higher priority. I just use this notation for simplicities sake.

Let (DeviceID, Priority). Let current thread pool be empty -> []

Say, we get an incoming 10 reports -> [(A, 1), (A, 1), (A, 1), (B, 1), (B, 1), (C, 1), (D, 1), (E, 1), (F, 1), (G, 1)] (represents the filled priority queue upon receiving the reports).

So, you dequeue the first item and give it to the thread pool. Then decrement the priority of all items in the priority queue with DeviceID A. This would look like the following:

(A, 1) is dequeued so you just get A. The priority queue would then shift after decrementing the priorities of A's still in the queue. [(B, 1), (B, 1), (C, 1), (D, 1), (E, 1), (F, 1), (G, 1), (A, 0), (A, 0)]

Luke
  • 76
  • 1
  • Thanks for the idea, but updating the queue wouldn't really be feasible, since we might get about 40 reports per second. – btnk Dec 16 '20 at 19:36
0

Project Loom

After having seen some late 2020 videos on YouTube.com with Ron Pressler, head of Project Loom at Oracle, the solution is quite simple with the new virtual threads (fibers) feature coming to a future release of Java:

  • Call a new Executors method to create an executor service that uses virtual threads (fibers) rather than platform/kernel threads.
  • Submit all incoming report processing tasks to that executor service.
  • Inside each task, attempt to grab a semaphore, one semaphore for each of your 1,000 devices.

That semaphore will be the way to process only one input per device at a time, to parallelize per source-device. If the semaphore representing a particular device is not available, simply block — let your report processing thread wait until the semaphore is available.

Project Loom maintains many lightweight virtual threads (fibers), even millions, that are run on a few heavyweight platform/kernel threads. This makes blocking a thread cheap.

Early builds of a JDK binary with Project Loom built-in for macOS/Linux/Windows are available now.

Caveat: I’m no expert on concurrency nor on Project Loom. But your particular use-case seems to match some specific recommendations made by Ron Pressler in his videos.

Example code

Here is some example code that I noodled around with. I am not all sure this is a good example or not.

I used an early-access build of Java 16, specially built with Project Loom technology: Build 16-loom+9-316 (2020/11/30) for macOS Intel.

package work.basil.example;

import java.time.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * An example of using Project Loom virtual threads to more simply process incoming data on background threads.
 * <p>
 * This code was built as a possible solution to this Question at StackOverflow.com: https://stackoverflow.com/q/65327325/642706
 * <p>
 * Posted in my Answer at StackOverflow.com: https://stackoverflow.com/a/65328799/642706
 * <p>
 * ©2020 Basil Bourque. 2020-12.
 * <p>
 * This work by Basil Bourque is licensed under CC BY 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by/4.0
 * <p>
 * Caveats:
 * - Project Loom is still in early-release, available only as a special build of OpenJDK for Java 16.
 * - I am *not* an expert on concurrency in general, nor Project Loom in particular. This code is merely me guessing and experimenting.
 */
public class App
{
    // FYI, Project Loom links:
    // https://wiki.openjdk.java.net/display/loom/Main
    // http://jdk.java.net/loom/  (special early-access builds of Java 16 with Project Loom built-in)
    // https://download.java.net/java/early_access/loom/docs/api/ (Javadoc)
    // https://www.youtube.com/watch?v=23HjZBOIshY  (Ron Pressler talk, 2020-07)

    public static void main ( String[] args )
    {
        System.out.println( "java.version: " + System.getProperty( "java.version" ) );
        App app = new App();
        app.checkForProjectLoom();
        app.demo();
    }

    public static boolean projectLoomIsPresent ( )
    {
        try
        {
            Thread.class.getDeclaredMethod( "startVirtualThread" , Runnable.class );
            return true;
        }
        catch ( NoSuchMethodException e )
        {
            return false;
        }
    }

    private void checkForProjectLoom ( )
    {
        if ( App.projectLoomIsPresent() )
        {
            System.out.println( "INFO - Running on a JVM with Project Loom technology. " + Instant.now() );
        } else
        {
            throw new IllegalStateException( "Project Loom technology not present in this Java implementation. " + Instant.now() );
        }
    }

    record ReportProcessorRunnable(Semaphore semaphore , Integer deviceIdentifier , boolean printToConsole , Queue < String > fauxDatabase) implements Runnable
    {
        @Override
        public void run ( )
        {
            // Our goal is to serialize the report-processing per device.
            // Each device can have only one report being processed at a time.
            // In Project Loom this can be accomplished simply by spawning virtual threads for all such
            // reports but process them serially by synchronizing on a binary (single-permit) semaphore.
            // Each thread working on a report submitted for that device waits on semaphore assigned to that device.
            // Blocking to wait for the semaphore is cheap in Project Loom using virtual threads. The underlying
            // platform/kernel thread carrying this virtual thread will be assigned other work while this
            // virtual thread is parked.
            try
            {
                semaphore.acquire(); // Blocks until the semaphore for this particular device becomes available. Blocking is cheap on a virtual thread.
                // Simulate more lengthy work being done by sleeping the virtual thread handling this task via the executor service.
                try {Thread.sleep( Duration.ofMillis( 100 ) );} catch ( InterruptedException e ) {e.printStackTrace();}
                String fauxData = "Insert into database table for device ID # " + this.deviceIdentifier + " at " + Instant.now();
                fauxDatabase.add( fauxData );
                if ( Objects.nonNull( this.printToConsole ) && this.printToConsole ) { System.out.println( fauxData ); }
                semaphore.release();  // For fun, comment-out this line to see the effect of the per-device semaphore at runtime.
            }
            catch ( InterruptedException e )
            {
                e.printStackTrace();
            }
        }
    }

    record IncomingReportsSimulatorRunnable(Map < Integer, Semaphore > deviceToSemaphoreMap ,
                                            ExecutorService reportProcessingExecutorService ,
                                            int countOfReportsToGeneratePerBatch ,
                                            boolean printToConsole ,
                                            Queue < String > fauxDatabase)
            implements Runnable
    {
        @Override
        public void run ( )
        {
            if ( printToConsole ) System.out.println( "INFO - Generating " + countOfReportsToGeneratePerBatch + " reports at " + Instant.now() );
            for ( int i = 0 ; i < countOfReportsToGeneratePerBatch ; i++ )
            {
                // Make a new Runnable task containing report data to be processed, and submit this task to the executor service using virtual threads.
                // To simulate a device sending in a report, we randomly pick one of the devices to pretend it is our source of report data.
                final List < Integer > deviceIdentifiers = List.copyOf( deviceToSemaphoreMap.keySet() );
                int randomIndexNumber = ThreadLocalRandom.current().nextInt( 0 , deviceIdentifiers.size() );
                Integer deviceIdentifier = deviceIdentifiers.get( randomIndexNumber );
                Semaphore semaphore = deviceToSemaphoreMap.get( deviceIdentifier );
                Runnable processReport = new ReportProcessorRunnable( semaphore , deviceIdentifier , printToConsole , fauxDatabase );
                reportProcessingExecutorService.submit( processReport );
            }
        }
    }

    private void demo ( )
    {
        // Configure experiment.
        Duration durationOfExperiment = Duration.ofSeconds( 20 );
        int countOfReportsToGeneratePerBatch = 7;  // Would be 40 per the Stack Overflow Question.
        boolean printToConsole = true;

        // To use as a concurrent list, I found this suggestion to use `ConcurrentLinkedQueue`: https://stackoverflow.com/a/25630263/642706
        Queue < String > fauxDatabase = new ConcurrentLinkedQueue < String >();

        // Represent each of the thousand devices that are sending us report data to be processed.
        // We map each device to a Java `Semaphore` object, to serialize the processing of multiple reports per device.
        final int firstDeviceNumber = 1_000;
        final int countDevices = 10; // Would be 1_000 per the Stack Overflow question.
        final Map < Integer, Semaphore > deviceToSemaphoreMap = new TreeMap <>();
        for ( int i = 0 ; i < countDevices ; i++ )
        {
            Integer deviceIdentifier = i + firstDeviceNumber; // Our devices are identified as numbered 1,000 to 1,999.
            Semaphore semaphore = new Semaphore( 1 , true ); // A single permit to make a binary semaphore, and make it fair.
            deviceToSemaphoreMap.put( deviceIdentifier , semaphore );
        }

        // Run experiment.
        // Notice that in Project Loom the `ExecutorService` interface is now `AutoCloseable`, for use in try-with-resources syntax.
        try (
                ScheduledExecutorService reportGeneratingExecutorService = Executors.newSingleThreadScheduledExecutor() ;
                ExecutorService reportProcessingExecutorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            Runnable simulateIncommingReports = new IncomingReportsSimulatorRunnable( deviceToSemaphoreMap , reportProcessingExecutorService , countOfReportsToGeneratePerBatch , printToConsole , fauxDatabase );
            ScheduledFuture scheduledFuture = reportGeneratingExecutorService.scheduleAtFixedRate( simulateIncommingReports , 0 , 1 , TimeUnit.SECONDS );
            try {Thread.sleep( durationOfExperiment );} catch ( InterruptedException e ) {e.printStackTrace();}
        }
        // Notice that when reaching this point we block until all submitted tasks still running are finished,
        // because that is the new behavior of `ExecutorService` being `AutoCloseable`.
        System.out.println( "INFO - executor services shut down at this point. " + Instant.now() );

        // Results of experiment
        System.out.println( "fauxDatabase.size(): " + fauxDatabase.size() );
        System.out.println( "fauxDatabase = " + fauxDatabase );
    }
}
Basil Bourque
  • 303,325
  • 100
  • 852
  • 1,154
  • This is very interesting and would simplify things. Unfortunately we're stuck with JDK 8 for the time being, but I'll keep an eye on it. Thanks. – btnk Dec 16 '20 at 19:35