Project Loom
After having seen some late 2020 videos on YouTube.com with Ron Pressler, head of Project Loom at Oracle, the solution is quite simple with the new virtual threads (fibers) feature coming to a future release of Java:
- Call a new
Executors
method to create an executor service that uses virtual threads (fibers) rather than platform/kernel threads.
- Submit all incoming report processing tasks to that executor service.
- Inside each task, attempt to grab a semaphore, one semaphore for each of your 1,000 devices.
That semaphore will be the way to process only one input per device at a time, to parallelize per source-device. If the semaphore representing a particular device is not available, simply block — let your report processing thread wait until the semaphore is available.
Project Loom maintains many lightweight virtual threads (fibers), even millions, that are run on a few heavyweight platform/kernel threads. This makes blocking a thread cheap.
Early builds of a JDK binary with Project Loom built-in for macOS/Linux/Windows are available now.
Caveat: I’m no expert on concurrency nor on Project Loom. But your particular use-case seems to match some specific recommendations made by Ron Pressler in his videos.
Example code
Here is some example code that I noodled around with. I am not all sure this is a good example or not.
I used an early-access build of Java 16, specially built with Project Loom technology: Build 16-loom+9-316 (2020/11/30)
for macOS Intel.
package work.basil.example;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
/**
* An example of using Project Loom virtual threads to more simply process incoming data on background threads.
* <p>
* This code was built as a possible solution to this Question at StackOverflow.com: https://stackoverflow.com/q/65327325/642706
* <p>
* Posted in my Answer at StackOverflow.com: https://stackoverflow.com/a/65328799/642706
* <p>
* ©2020 Basil Bourque. 2020-12.
* <p>
* This work by Basil Bourque is licensed under CC BY 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by/4.0
* <p>
* Caveats:
* - Project Loom is still in early-release, available only as a special build of OpenJDK for Java 16.
* - I am *not* an expert on concurrency in general, nor Project Loom in particular. This code is merely me guessing and experimenting.
*/
public class App
{
// FYI, Project Loom links:
// https://wiki.openjdk.java.net/display/loom/Main
// http://jdk.java.net/loom/ (special early-access builds of Java 16 with Project Loom built-in)
// https://download.java.net/java/early_access/loom/docs/api/ (Javadoc)
// https://www.youtube.com/watch?v=23HjZBOIshY (Ron Pressler talk, 2020-07)
public static void main ( String[] args )
{
System.out.println( "java.version: " + System.getProperty( "java.version" ) );
App app = new App();
app.checkForProjectLoom();
app.demo();
}
public static boolean projectLoomIsPresent ( )
{
try
{
Thread.class.getDeclaredMethod( "startVirtualThread" , Runnable.class );
return true;
}
catch ( NoSuchMethodException e )
{
return false;
}
}
private void checkForProjectLoom ( )
{
if ( App.projectLoomIsPresent() )
{
System.out.println( "INFO - Running on a JVM with Project Loom technology. " + Instant.now() );
} else
{
throw new IllegalStateException( "Project Loom technology not present in this Java implementation. " + Instant.now() );
}
}
record ReportProcessorRunnable(Semaphore semaphore , Integer deviceIdentifier , boolean printToConsole , Queue < String > fauxDatabase) implements Runnable
{
@Override
public void run ( )
{
// Our goal is to serialize the report-processing per device.
// Each device can have only one report being processed at a time.
// In Project Loom this can be accomplished simply by spawning virtual threads for all such
// reports but process them serially by synchronizing on a binary (single-permit) semaphore.
// Each thread working on a report submitted for that device waits on semaphore assigned to that device.
// Blocking to wait for the semaphore is cheap in Project Loom using virtual threads. The underlying
// platform/kernel thread carrying this virtual thread will be assigned other work while this
// virtual thread is parked.
try
{
semaphore.acquire(); // Blocks until the semaphore for this particular device becomes available. Blocking is cheap on a virtual thread.
// Simulate more lengthy work being done by sleeping the virtual thread handling this task via the executor service.
try {Thread.sleep( Duration.ofMillis( 100 ) );} catch ( InterruptedException e ) {e.printStackTrace();}
String fauxData = "Insert into database table for device ID # " + this.deviceIdentifier + " at " + Instant.now();
fauxDatabase.add( fauxData );
if ( Objects.nonNull( this.printToConsole ) && this.printToConsole ) { System.out.println( fauxData ); }
semaphore.release(); // For fun, comment-out this line to see the effect of the per-device semaphore at runtime.
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
}
}
record IncomingReportsSimulatorRunnable(Map < Integer, Semaphore > deviceToSemaphoreMap ,
ExecutorService reportProcessingExecutorService ,
int countOfReportsToGeneratePerBatch ,
boolean printToConsole ,
Queue < String > fauxDatabase)
implements Runnable
{
@Override
public void run ( )
{
if ( printToConsole ) System.out.println( "INFO - Generating " + countOfReportsToGeneratePerBatch + " reports at " + Instant.now() );
for ( int i = 0 ; i < countOfReportsToGeneratePerBatch ; i++ )
{
// Make a new Runnable task containing report data to be processed, and submit this task to the executor service using virtual threads.
// To simulate a device sending in a report, we randomly pick one of the devices to pretend it is our source of report data.
final List < Integer > deviceIdentifiers = List.copyOf( deviceToSemaphoreMap.keySet() );
int randomIndexNumber = ThreadLocalRandom.current().nextInt( 0 , deviceIdentifiers.size() );
Integer deviceIdentifier = deviceIdentifiers.get( randomIndexNumber );
Semaphore semaphore = deviceToSemaphoreMap.get( deviceIdentifier );
Runnable processReport = new ReportProcessorRunnable( semaphore , deviceIdentifier , printToConsole , fauxDatabase );
reportProcessingExecutorService.submit( processReport );
}
}
}
private void demo ( )
{
// Configure experiment.
Duration durationOfExperiment = Duration.ofSeconds( 20 );
int countOfReportsToGeneratePerBatch = 7; // Would be 40 per the Stack Overflow Question.
boolean printToConsole = true;
// To use as a concurrent list, I found this suggestion to use `ConcurrentLinkedQueue`: https://stackoverflow.com/a/25630263/642706
Queue < String > fauxDatabase = new ConcurrentLinkedQueue < String >();
// Represent each of the thousand devices that are sending us report data to be processed.
// We map each device to a Java `Semaphore` object, to serialize the processing of multiple reports per device.
final int firstDeviceNumber = 1_000;
final int countDevices = 10; // Would be 1_000 per the Stack Overflow question.
final Map < Integer, Semaphore > deviceToSemaphoreMap = new TreeMap <>();
for ( int i = 0 ; i < countDevices ; i++ )
{
Integer deviceIdentifier = i + firstDeviceNumber; // Our devices are identified as numbered 1,000 to 1,999.
Semaphore semaphore = new Semaphore( 1 , true ); // A single permit to make a binary semaphore, and make it fair.
deviceToSemaphoreMap.put( deviceIdentifier , semaphore );
}
// Run experiment.
// Notice that in Project Loom the `ExecutorService` interface is now `AutoCloseable`, for use in try-with-resources syntax.
try (
ScheduledExecutorService reportGeneratingExecutorService = Executors.newSingleThreadScheduledExecutor() ;
ExecutorService reportProcessingExecutorService = Executors.newVirtualThreadExecutor() ;
)
{
Runnable simulateIncommingReports = new IncomingReportsSimulatorRunnable( deviceToSemaphoreMap , reportProcessingExecutorService , countOfReportsToGeneratePerBatch , printToConsole , fauxDatabase );
ScheduledFuture scheduledFuture = reportGeneratingExecutorService.scheduleAtFixedRate( simulateIncommingReports , 0 , 1 , TimeUnit.SECONDS );
try {Thread.sleep( durationOfExperiment );} catch ( InterruptedException e ) {e.printStackTrace();}
}
// Notice that when reaching this point we block until all submitted tasks still running are finished,
// because that is the new behavior of `ExecutorService` being `AutoCloseable`.
System.out.println( "INFO - executor services shut down at this point. " + Instant.now() );
// Results of experiment
System.out.println( "fauxDatabase.size(): " + fauxDatabase.size() );
System.out.println( "fauxDatabase = " + fauxDatabase );
}
}