I have written program to analyze the logs where intention is to filter/print error statements in logs
String master = "local[*]";
String inputFilePath = "C:/test.log";
SparkConf conf = new SparkConf().setAppName(App.class.getName())
.setMaster(master);
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> stringRDD = context.textFile(inputFilePath);
stringRDD.filter(text -> text.contains("ERROR"))
.collect().forEach(result -> System.out.println(result));
But log file is being written continously by different processes. Here is timeline example
- At T1, 10 lines exists in log file
- At T2(after 5 sec), 5 more lines are added
- At T3(after 5 sec), 7 more lines are added
Now My program should read the file after ever 5 second and print the error statements from newly added lines only . Should i manually spawn the thread which keeps on reading after every 5 secs or is there a better way in spark ?
Update :-
Based on google i tried below but no help
SparkConf conf = new SparkConf().setAppName(App.class.getName())
.setMaster(master);
//JavaSparkContext context = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));
JavaDStream<String> stringRDD = streamingContext.textFileStream(inputFilePath);
stringRDD.filter(text -> text.contains("ERROR")).foreachRDD(result -> System.out.println(result));