1

I have written program to analyze the logs where intention is to filter/print error statements in logs

    String master = "local[*]";

    String inputFilePath = "C:/test.log";

    SparkConf conf = new SparkConf().setAppName(App.class.getName())
            .setMaster(master);
    JavaSparkContext context = new JavaSparkContext(conf);

    JavaRDD<String> stringRDD = context.textFile(inputFilePath);

    stringRDD.filter(text -> text.contains("ERROR"))
            .collect().forEach(result -> System.out.println(result));

But log file is being written continously by different processes. Here is timeline example

  1. At T1, 10 lines exists in log file
  2. At T2(after 5 sec), 5 more lines are added
  3. At T3(after 5 sec), 7 more lines are added

Now My program should read the file after ever 5 second and print the error statements from newly added lines only . Should i manually spawn the thread which keeps on reading after every 5 secs or is there a better way in spark ?

Update :-

Based on google i tried below but no help

SparkConf conf = new SparkConf().setAppName(App.class.getName())
                .setMaster(master);
        //JavaSparkContext context = new JavaSparkContext(conf);
        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));

        JavaDStream<String> stringRDD = streamingContext.textFileStream(inputFilePath);
            stringRDD.filter(text -> text.contains("ERROR")).foreachRDD(result -> System.out.println(result));
scott miles
  • 1,511
  • 2
  • 21
  • 36
  • This is a prime use case for Spark Structured streaming. Watch Michael Armbrust's presentation at Spark Summit 2017 Boston where he shows off a nearly identical use case of tracking errors using Structured streaming. Also if your logs are small, Spark may very well not be a good option at all. Scala is preferred for spark over Java but that's ultimately up to you. – Garren S Jun 06 '17 at 03:58
  • @Garren Are you referring to video https://www.youtube.com/watch?list=PLTPXxbhUt-YVEyOqTmZ_X_tpzOlJLiU2k&v=IJmFTXvUZgY ? Ragrding Scala actually i don't have any exp in scala :( – scott miles Jun 06 '17 at 04:09
  • Yep, that's the video. Scala is fairly easy to pickup. IntelliJ can even auto-convert (with some manual refinements of course) Java code to Scala code for you. – Garren S Jun 06 '17 at 04:17
  • https://forums.databricks.com/questions/11723/unable-to-read-the-streaming-data-from-the-single.html - this is a highly relevant open question on Databricks ("creators" of spark). Java itself can use the Apache Commons Tailer class to keep reading the new file contents in: https://stackoverflow.com/questions/557844/java-io-implementation-of-unix-linux-tail-f – Garren S Jun 06 '17 at 04:24
  • Thanks Garren. My question is exactly same as you pointed to question on databricks forum ? But I am really surprised that i did not find answer on google as it is very basic usecase for spark ? Will wait for that question to be answered. Also I am not getting how in java-spark i can distribute the processing on more than one node (basically simple example on clustering)? – scott miles Jun 06 '17 at 04:32
  • Normally logs are rotated based on number of lines, bytes, time, etc. Reading a single file that's constantly being appended to and is simultaneously somehow big enough to be worth distributing on a cluster sounds off. Either you have new files being written so as to rotate your logs (maybe each log file is max 100MB) OR you write to a single log file (under say a couple gigs) and use a lighter system (whether Java, Python, etc) to read and process the logs. Throw the log file into HDFS or read from local FS, then run your job on it. You could indeed loop every N seconds and re-process. – Garren S Jun 06 '17 at 04:40
  • Fo reference :- https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts – scott miles Jun 06 '17 at 07:10
  • Unbounded table != unbounded file. There may well be a way to use Kafka or some other solution to turn your file into a stream and I'm sure if you provide that input back here, many others (including myself) would find that interesting. – Garren S Jun 06 '17 at 17:24
  • @Garren In the API `new JavaStreamingContext(conf, Durations.seconds(10))`. Does duration mean spark will read the input stream ever 10 seconds ? Also I am trying to read file from local file system , does file need to be present in HDFS instead of file system ? – scott miles Jun 07 '17 at 04:17
  • When run locally it will be read from local file system. In yarn deploy mode it will look in HDFS. I'm not sure how streaming behaves with a file in this situation. – Garren S Jun 07 '17 at 04:21

0 Answers0