3

I am using Java nio's WatchService but I find it quite unreliable for the following use cases:

  1. When a very big file (>500 megs) is written to the directory being watched, if I rely on ENTRY_CREATE event, the file is often times not yet ready to read - it's still being written by the other thread. So I normally resort to listening for ENTRY_MODIFY instead.

  2. But when thousands of small sized (~2Kb) files are copied to the watched directory, the ENTRY_CREATE or ENTRY_MODIFY doesn't get invoked for 80% of them!

Has anyone else faced this? And is there a better more reliable lib or should I simply switch to a blocking queue imlementation where the file copier adds the file name into the queue and the consumer thread handles processing of the files?

Code around WatchService implementation:


            WatchService watchService = FileSystems.getDefault().newWatchService();
            Path path = Paths.get( coreProperties.getStagingLocation() );
            path.register( watchService,
                    new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                    SensitivityWatchEventModifier.MEDIUM
            );

            WatchKey key;
            while ( ( key = watchService.take() ) != null ) {
                for ( WatchEvent<?> event : key.pollEvents() ) {
                    log.info( "Event kind: {} . File affected: {}.", event.kind(), event.context() );
            // Processing the file..
                }
                key.reset();
            }

 
saurabh.in
  • 389
  • 3
  • 13
  • 1
    It is better to collate events and wait for publishing to end or pause. Your strategy may be failing because you are responding to every event and maybe falling behind, as your event processing might be running 100s of times for the same file edit. See more info [here](https://stackoverflow.com/questions/65223686/java-watchservice-perform-action-on-event-using-threads/65251819#65251819). – DuncG Feb 04 '22 at 18:49
  • If you don't want to miss events your `take()` loop should do nothing except queue the event for processing on another thread. – Jim Garrison Feb 05 '22 at 00:16
  • Also, did seem to be ignoring OVERFLOW events. Those would tell you when events got discarded. – Jim Garrison Feb 05 '22 at 00:27
  • Thanks for the comments. I have noticed that the watcher service was indeed sensitive to the processing time per event. Also I tried scheduled executer service and was trying to delay and process each notified file just once. But it looks like watch service is getting affected with the synchronisation involved. I will try with lmax disruptor where my disruptor consumer will performed delayed processing. – saurabh.in Feb 05 '22 at 12:25

1 Answers1

1

Based on the helpful comments by DuncG and Jim Garrison, I realised that Watch Service is sensitive to the processing times of each notification. I was copying 6,416 files to the folder it was watching and if I was doing anything more than logging the ENTRY_XX event, it was missing on many of the updates.

Here is what worked for me:

  1. While handling ENTRY_XX events, I am writing it to a LMAX Disruptor which has ring buffer size higher than max files expected in a batch (I set it to 2^19 i.e. 524,288 slots, and it's good enough to handle 50k or more file updates without blocking, assuming file will have 10 watch service notications).

[PS: writing to a simple ExecutorService queue didn't help due to latency around thread synchronisation. I only got 1273 file names out of the 6,416!].

  1. I also had to warm up the ring buffer or else I was still missing some Watch Service updates. I am filling all its slots with test messages and then sending an async event to the function which copies 6,416 files to the watched folder.

Sample code:

// publishing file names from watch service event to Disrupto ring buffer
 
    private void watchStagingFolder() {
        try {
            WatchService watchService = FileSystems.getDefault().newWatchService();
            Path path = Paths.get( coreProperties.getStagingLocation() );
            path.register( watchService,
                    new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                    SensitivityWatchEventModifier.HIGH
            );

            WatchKey key;
            while ( ( key = watchService.take() ) != null ) {
                log.info( "key found: {}", key );
                for ( WatchEvent<?> event : key.pollEvents() ) {
                    String eventKindStr = event.kind().name();
                    log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
                    if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
                        String fileName = event.context().toString();
                        log.info( "File to be processed: {}", fileName );
                        fileProcessorDisruptorEventProducer.send( fileName );
                    } else {
                        log.info( "Ignoring event kind {}", event.kind() );
                    }
                }
                key.reset();
            }
        } catch ( Exception e ) {
            log.error( "Found error while watching the staging directory.", e );
        }
    }

// ensuring Disruptor ring buffer is warmed up
@Component
@RequiredArgsConstructor
@Slf4j
public class DisruptorWarmer {
    public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
    private final CoreProperties coreProperties;
    private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;

    @PostConstruct
    public void init() {
        int bufferSize = coreProperties.getDisruptor().getBufferSize();
        for ( int i = 0; i < bufferSize; i++ ) {
            fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
        }
        log.info( "Warmed up disruptor with {} test messages.", bufferSize );
    }
}

// processing files in the Disruptor consumer/handler
    @Override
    public void onEvent( Msg msg, long l, boolean b ) {
        try {
            if ( count < bufferSize ) {
                log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
                count++;
            } else if ( count == bufferSize ) {
                log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
                newSingleThreadExecutor.submit( () ->
                        applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
                );
                count++;
            } else {
                log.debug( "File: {}", msg.getPayload() );
############ 
// no longer worried about slow processing impacting watch service
                processFile( ( String ) msg.getPayload() );
############
            }
        } catch ( RuntimeException rte ) {
            log.error( "Found error while processing msg: [{}]. Skipping to next message.", msg, rte );
        }
    }
saurabh.in
  • 389
  • 3
  • 13