0

I am using hdfs file watcher service to load a config file as soon it is changed in my flink streaming job.

Source for watcher service : HDFS file watcher

The issue I am facing here is that the watcher service is reacting to change in entire hdfs rather than just the directory I am passing.

My code:

public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
  {
    HdfsAdmin admin = new HdfsAdmin( URI.create("hdfs://stage.my-org.in:8020/tmp/anurag/"), new Configuration() );
    DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
    while( true ) {
      EventBatch events = eventStream.take();
      for( Event event : events.getEvents() ) {
        switch( event.getEventType() ) {
          case CREATE:
            System.out.print( "event type = " + event.getEventType() );
            CreateEvent createEvent = (CreateEvent) event;
            System.out.print( "  path = " + createEvent.getPath() + "\n");
            break;
          default:
            break;
        }
      }
    }
  }

Output from program :

event type = CREATE  path = /tmp/anurag/newFile.txt
event type = CREATE  path = /tmp/newFile2.txt

Please help me resolve this issue so that I can watch files in the particular directory passed as URI

Thanks in anticipation

Note: If you try to run this program, please run as hdfs user, else you would get org.apache.hadoop.security.AccessControlException

Olaf Kock
  • 46,930
  • 8
  • 59
  • 90
Anurag Anand
  • 193
  • 12

2 Answers2

0

For now, I am using Hadoop API to get file every 30 sec, reading it's modification time, and if it's greater than reloading the file again.

Anurag Anand
  • 193
  • 12
0

The InotifyEventStream is nothing more than the HDFS events log parsed into an object, it will send all events in HDFS to you no matter which directory you set at constructor, that's one of the reasons why you need to run that code with a supergroup member.

The solution is to filter the events when they come, getting only those from the directory you want to. Something like:

EventBatch events = eventStream.take();
ArrayList<CreateEvent> filteredEvents = new ArrayList();
for( Event event : events.getEvents() ) {
  switch( event.getEventType() ) {
    case CREATE:
      System.out.print( "event type = " + event.getEventType() );
      CreateEvent createEvent = (CreateEvent) event;
      if (createEvent.getPath() == '/your/desired/path') {
        System.out.print( "  path = " + createEvent.getPath() + "\n");
        filteredEvents.add(createEvent);
      }           
      break;
    default:
      break;
  }
}