3

I have a directory containing gzip compressed log files with one event per line. In order to read and process these real-time, I have created a WatcherService that are identical to the code listed here: http://docs.oracle.com/javase/tutorial/essential/io/notification.html

In the processEvents() method, I have added this code to read the files that have been added or appended, line by line:

if (kind == ENTRY_MODIFY) {
    try(BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(Files.newInputStream(child, StandardOpenOption.READ))))) {
        String line;
        while ((line = reader.readLine()) != null) {
            System.out.println(line);
        }
    }
    catch(EOFException ex) {
        //file is empty,  so ignore until next signal
    }
    catch(Exception ex) {
        ex.printStackTrace();
    }
}

Now, as you can imagine, this works great for files that are created written and closed within milliseconds, however, when working with large files that are appended over time, this will read the entire file over and over again for every appended line (given that the file is flushed and synced by the producer now and then).

Is there any way I can read only the new lines in this file every time a ENTRY_MODIFY signal is sent, or find out when the file is "complete"?

How do I deal with files that are not appended, but rather overwritten?

agnsaft
  • 1,791
  • 7
  • 30
  • 49
  • 1
    Take a look at http://stackoverflow.com/q/937747/3080094 One of the answers might help you find what you are looking for. – vanOekel Jul 12 '14 at 18:09

1 Answers1

5

First I would like to answer the technical aspect of your question:

A WatchEvent just gives you the file name of a changed (or created or deleted) file and nothing more. So if you need any logic beyond this you have to be implement it on your own (or use an existing library of course).

If you want to read only new lines, you have to remember the position for each file and whenever this file is changed you can move to the last known position. To get the current position you could use a CountingInputStream from the Commons IO package (credits go to [1]). To jump to the last position, you can use the function skip.

But you are using a GZIPInputStream, this means that skip will not give you a great performance boost since skipping a compressed stream is not possible. Instead GZIPInputStream skip will uncompress the stream as it would when you are reading it so you will experience only little performance improvements (try it!).

What I don't understand is why you are using compressed log files at all? Why don't you write uncompressed logs with a DailyRollingFileAppender and compress it at the end of the day, when the application doesn't access it anymore?

Another solution could be to keep the GZIPInputStream (store it) so that you don't have to reread the file again. It may depend on how many log files you have to watch to decide if this is reasonable.

Now some questions on your requirements:

You didn't mention the reason why you want to watch the log files in real time. Why don't you centralize your logs (see Centralised Java Logging)? For example take a look on logstash and this presentation (see [2] and [3]) or on scribe or on splunk, which is commercial (see [4]).

A centralized log would give you the opportunity to really have real time reactions based on your log data.

[1] https://stackoverflow.com/a/240740/734687
[2] Using elasticsearch, logstash & kibana to create realtime dashboards - slides
[3] Using elasticsearch, logstash & kibana to create realtime dashboards - video
[4] Log Aggregation with Splunk - slides

Update

First, a Groovy script to generate a zipped log file. I start this script from GroovyConsole each time I want to simulate a log file change:

// Run with GroovyConsole each time you want new entries
def file = new File('D:\\Projekte\\watcher_service\\data\\log.gz')

// reading previous content since append is not possible
def content
if (file.exists()) {
    def inStream = new java.util.zip.GZIPInputStream(file.newInputStream())
    content = inStream.readLines()
}

// writing previous content and append new data
def random  = new java.util.Random()  
def lineCount = random.nextInt(30) + 1
def outStream = new java.util.zip.GZIPOutputStream(file.newOutputStream())

outStream.withWriter('UTF-8') { writer ->
    if (content) {
        content.each { writer << "$it\n" }
    }
    (1 .. lineCount).each {
        writer.write "Writing line $it/$lineCount\n"
    }
    writer.write '---Finished---\n'
    writer.flush()
    writer.close()
}

println "Wrote ${lineCount + 1} lines."

Then the logfile reader:

import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.util.zip.GZIPInputStream
import org.apache.commons.io.input.CountingInputStream
import static java.nio.file.StandardWatchEventKinds.*

class LogReader
{
    private final Path dir = Paths.get('D:\\Projekte\\watcher_service\\data\\')
    private watcher
    private positionMap = [:]
    long lineCount = 0

    static void main(def args)
    {
        new LogReader().processEvents()
    }

    LogReader()
    {
        watcher = FileSystems.getDefault().newWatchService()
        dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
    }

    void processEvents()
    {
        def key = watcher.take()
        boolean doLeave = false

        while ((key != null) && (doLeave == false))
        {
            key.pollEvents().each { event ->
                def kind = event.kind()
                Path name = event.context()

                println "Event received $kind: $name"
                if (kind == ENTRY_MODIFY) {
                    // use position from the map, if entry is not there use default value 0
                    processChange(name, positionMap.get(name.toString(), 0))
                }
                else if (kind == ENTRY_CREATE) {
                    processChange(name, 0)
                }
                else {
                    doLeave = true
                    return
                }
            }
            key.reset()
            key = watcher.take()
        }
    }

    private void processChange(Path name, long position)
    {
        // open file and go to last position
        Path absolutePath = dir.resolve(name)
        def countingStream =
                new CountingInputStream(
                new GZIPInputStream(
                Files.newInputStream(absolutePath, StandardOpenOption.READ)))
        position = countingStream.skip(position)
        println "Moving to position $position"

        // processing each new line
        // at the first start all lines are read
        int newLineCount = 0
        countingStream.withReader('UTF-8') { reader ->
            reader.eachLine { line ->
                println "${++lineCount}: $line"
                ++newLineCount
            }
        }
        println "${++lineCount}: $newLineCount new lines +++Finished+++"

        // store new position in map
        positionMap[name.toString()] = countingStream.count
        println "Storing new position $countingStream.count"
        countingStream.close()
    }
}

In the function processChange you can see 1) the creation of the inputstreams. The line with the .withReader creates the InputStreamReader and the BufferedReader. I use always Grovvy, it is Java on stereoids and when you start using it, you cannot stop. A Java developer should be able to read it, but if you have questions just comment.

Community
  • 1
  • 1
ChrLipp
  • 15,526
  • 10
  • 75
  • 107
  • Thank you for your input. Regarding the compressed log files, this is caused by a third party proprietary tool that store its files like that, unfortunately. Regarding your comment about centralized log storage, this is what I am trying to achieve by extending Flume with a custom input for this particular case :) – agnsaft Jul 15 '14 at 07:18
  • Ok, I see :-) So you can forget the second part of my answer, but I don't delete it if this is ok for you. Is the first part detailed enough? – ChrLipp Jul 15 '14 at 11:56
  • Yes. One question though... if I do use a countinginputstream wrapped in a gzipinputstream and then bufferedreader, I probably cannot do operations directly on the countinginputstream without messing things up with the gzipinputstream and bufferedreader? – agnsaft Jul 15 '14 at 13:40
  • `CountingInputStream` is just a decorator, so you use it as wrapper for `GZIPInputStream`. You need to provide a variable for the stream to access it but you wrap the variable in a `InputStreamReader` as you did before. The only reason for the `CountingInputStream` is to provide the amounts of chars in the stream. I will try to provide a sample. – ChrLipp Jul 15 '14 at 22:14
  • But, if I wrap CountingInputStream in a BufferedReader, won't the counter be "wrong" if the reader has buffered any data that has not been processed by the application. e.g., BufferedReader reads 1000 bytes, 900 of these are full lines and are processed while the last 100 are unprocessed and just in the buffer. In this situation, wouldnt the CountingInputStream count 1000 while the actual number should be 900 if I want to skip later? – agnsaft Jul 16 '14 at 06:39
  • This is theoretically true but I am not sure if this will occur in your usage scenario. Alternatively you have still the second option (keeping the `GZIPInputStream` for each file). Google Guave also has a CountingInputStream, but I didn't find an implementation within a single class. – ChrLipp Jul 16 '14 at 08:06
  • Thanks for the bounty! Did you solve it already? I have finished my code, but get an EOF exception when I open the file the second time (with and without skipping). Should I post my code prior to solving this problem? – ChrLipp Jul 17 '14 at 08:09
  • 1
    I did not solve it yet, but you were so helpful so I decided to award the bounty anyway. – agnsaft Jul 18 '14 at 14:59
  • Updated the code, hope you can read it. What I didn't try is to store the whole `GZIPInputStream` instead of the position. If you want to get some Groovy introduction, try http://groovykoans.org/ – ChrLipp Jul 19 '14 at 08:11