I am using hdfs file watcher service to load a config file as soon it is changed in my flink streaming job.
Source for watcher service : HDFS file watcher
The issue I am facing here is that the watcher service is reacting to change in entire hdfs rather than just the directory I am passing.
My code:
public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
HdfsAdmin admin = new HdfsAdmin( URI.create("hdfs://stage.my-org.in:8020/tmp/anurag/"), new Configuration() );
DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
while( true ) {
EventBatch events = eventStream.take();
for( Event event : events.getEvents() ) {
switch( event.getEventType() ) {
case CREATE:
System.out.print( "event type = " + event.getEventType() );
CreateEvent createEvent = (CreateEvent) event;
System.out.print( " path = " + createEvent.getPath() + "\n");
break;
default:
break;
}
}
}
}
Output from program :
event type = CREATE path = /tmp/anurag/newFile.txt
event type = CREATE path = /tmp/newFile2.txt
Please help me resolve this issue so that I can watch files in the particular directory passed as URI
Thanks in anticipation
Note: If you try to run this program, please run as hdfs user, else you would get org.apache.hadoop.security.AccessControlException