Deques (2.6+)
If you know that the file is going to be small, a simple deque will do just fine.
from collections import deque
def tail(f, n):
return deque(f, n)
Quote from docs.python.org:
If maxlen is not specified or is None, deques may grow to an arbitrary length. Otherwise, the deque is bounded to the specified maximum length. Once a bounded length deque is full, when new items are added, a corresponding number of items are discarded from the opposite end. Bounded length deques provide functionality similar to the tail filter in Unix. They are also useful for tracking transactions and other pools of data where only the most recent activity is of interest.
Galloping search (2.7+)
When the size of the file is unspecified, consider sarching the file from the end.
Galloping, or exponential, search minimize the number of read calls by multiplying the number of bytes to search by two each iteration.
This snippet handles edge cases well, apart from multi byte delimiters and files opened in text mode (see "Edge cases" for an example that can handle those) and stores the segments in a memory-efficient deque
until joining them just before returning them as a single bytes
, taking care to only read data once.
from collections import deque
from os import SEEK_CUR, SEEK_END
def tail(f, n, d = b'\n'):
u"Read `n` segments (lines) from the end of file `f`, separated by `d`."
a = deque()
o = 1
try:
# Seek to end of file, exclude first byte from check for newline.
f.seek(-1, SEEK_END)
s = f.read(1)
c = 0
# Read more segments until enough newline characters has been read.
while c < n:
n -= c # Subtract newline count from remaining.
a.appendleft(s) # Insert segment at the beginning.
f.seek(-o * 3, SEEK_CUR) # Seek past the read bytes, plus 2x that.
o *= 2 # Multiply step- and readsize by two.
s = f.read(o) # Read new segment from file.
c = s.count(d) # Count the number of newline characters.
except OSError:
# Reached beginning of file, read start of file > start of last segment.
p = max(0, f.tell() - o)
f.seek(0)
s = f.read(p)
c = s.count(d)
if c >= n:
# Strip data, up to the start of the first line, from the last segment.
i = s.rfind(d)
while i != -1 and n > 1:
i = s.rfind(d, None, i)
n -= 1
s = s[i+1:]
a.appendleft(s)
return b"".join(a)
Usage:
f.write(b'Third\nSecond\nLast'); f.seek(0)
assert readlast(f, 2, b'\n') == b"Second\nLast\n"
f.write(b'\n\n'); f.seek(0)
assert readlast(f, 1, b'\n') == b"\n"
f.write(b'X\n'); f.seek(0)
assert readlast(f, 1, b'\n') == b"X\n"
f.write(b''); f.seek(0)
assert readlast(f, 1, b'\n') == b""
Edge cases (2.7+)
The simplest approach, part from reading the whole file, is to step over data from the end of the file and checks each read byte, or block of bytes, against a delimiter value/character.
It is not as fast as the galloping search function above but it's much easier to write function than can handle edge cases like UTF-16/32 encoded files and files where other multi byte line separators is used.
This example can, apart from that, also handle files opened in text mode (but you should still consider re-opening them in byte mode, as its relative seek calls is more efficient).
def _tail__bytes(f, n, sep, size, step):
# Point cursor to the end of the file.
f.seek(0, SEEK_END)
# Halt when 'sep' occurs enough times.
while n > 0:
# Seek past the byte just read, or last byte if none has been read.
f.seek(-size-step, SEEK_CUR)
# Read one byte/char/block, then step again, until 'sep' occurs.
while f.read(size) != sep:
f.seek(-size-step, SEEK_CUR)
n -= 1
def _tail__text(f, n, sep, size, step):
# Text mode, same principle but without the use of relative offsets.
o = f.seek(0, SEEK_END)
o = f.seek(o-size-step)
while n > 0:
o = f.seek(o-step)
while f.read(step) != sep:
o = f.seek(o-step)
n -= 1
def tail(f, n, sep, fixed = False):
"""tail(f: io.BaseIO, n: int, sep: bytes, fixed: bool = False) -> bytes|str
Return the last `n` segments of file `f`, separated by `sep`.
Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
to pass the correct delimiter) in files opened in byte mode.
"""
size = len(sep)
step = len(sep) if (fixed is True) else (fixed or 1)
if not size:
raise ValueError("Zero-length separator.")
try:
if 'b' in f.mode:
# Process file opened in byte mode.
_tail__bytes(f, n, sep, size, step)
else:
# Process file opened in text mode.
_tail__text(f, n, sep, size, step)
except (OSError, ValueError):
# Beginning of file reached.
f.seek(0, SEEK_SET)
return f.read()
Usage:
f.write("X\nY\nZ\n").encode('utf32'); f.seek(0)
assert tail(f, 1, "\n".encode('utf32')[4:], fixed = True) == b"Z\n"
f.write("X\nY\nZ\n").encode('utf16'); f.seek(0)
assert tail(f, 1, "\n".encode('utf16')[2:], fixed = True) == b"Z\n"
f.write(b'X<br>Y</br>'); f.seek(0)
assert readlast(f, 1, b'<br>') == b"Y</br>"
f.write("X\nY\n"); f.seek(0)
assert readlast(f, 1, "\n") == "Y\n"
Examples were tested against files of varying lengths, empty files, files of various sizes that consist of only newlines and so on before being posted. Ignores trailing newline character.