Based on the helpful comments by DuncG and Jim Garrison, I realised that Watch Service is sensitive to the processing times of each notification. I was copying 6,416 files to the folder it was watching and if I was doing anything more than logging the ENTRY_XX event, it was missing on many of the updates.
Here is what worked for me:
- While handling ENTRY_XX events, I am writing it to a LMAX Disruptor which has ring buffer size higher than max files expected in a batch (I set it to 2^19 i.e. 524,288 slots, and it's good enough to handle 50k or more file updates without blocking, assuming file will have 10 watch service notications).
[PS: writing to a simple ExecutorService queue didn't help due to latency around thread synchronisation. I only got 1273 file names out of the 6,416!].
- I also had to warm up the ring buffer or else I was still missing some Watch Service updates. I am filling all its slots with test messages and then sending an async event to the function which copies 6,416 files to the watched folder.
Sample code:
// publishing file names from watch service event to Disrupto ring buffer
private void watchStagingFolder() {
try {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get( coreProperties.getStagingLocation() );
path.register( watchService,
new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
SensitivityWatchEventModifier.HIGH
);
WatchKey key;
while ( ( key = watchService.take() ) != null ) {
log.info( "key found: {}", key );
for ( WatchEvent<?> event : key.pollEvents() ) {
String eventKindStr = event.kind().name();
log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
String fileName = event.context().toString();
log.info( "File to be processed: {}", fileName );
fileProcessorDisruptorEventProducer.send( fileName );
} else {
log.info( "Ignoring event kind {}", event.kind() );
}
}
key.reset();
}
} catch ( Exception e ) {
log.error( "Found error while watching the staging directory.", e );
}
}
// ensuring Disruptor ring buffer is warmed up
@Component
@RequiredArgsConstructor
@Slf4j
public class DisruptorWarmer {
public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
private final CoreProperties coreProperties;
private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;
@PostConstruct
public void init() {
int bufferSize = coreProperties.getDisruptor().getBufferSize();
for ( int i = 0; i < bufferSize; i++ ) {
fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
}
log.info( "Warmed up disruptor with {} test messages.", bufferSize );
}
}
// processing files in the Disruptor consumer/handler
@Override
public void onEvent( Msg msg, long l, boolean b ) {
try {
if ( count < bufferSize ) {
log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
count++;
} else if ( count == bufferSize ) {
log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
newSingleThreadExecutor.submit( () ->
applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
);
count++;
} else {
log.debug( "File: {}", msg.getPayload() );
############
// no longer worried about slow processing impacting watch service
processFile( ( String ) msg.getPayload() );
############
}
} catch ( RuntimeException rte ) {
log.error( "Found error while processing msg: [{}]. Skipping to next message.", msg, rte );
}
}