from os import SEEK_END, SEEK_CUR
def readlast(f):
try:
f.seek(-2, SEEK_END) # Jump to the second last byte.
while f.read(1) != b"\n": # Until newline is found ...
f.seek(-2, SEEK_CUR) # ... jump back, over the read byte plus one.
except OSError: # Reached begginning of File
f.seek(0) # Set cursor to beginning of file as well.
return f.read() # Read all data from this point on.
with open(path, "rb") as f:
first = f.readline()
last = readlast(f)
When using seek
the format is fseek(offset, whence=0)
Quote from docs.python.org:
Change the stream position to the given byte offset. offset is interpreted relative to the position indicated by whence. The default value for whence is SEEK_SET. Values for whence are:
SEEK_SET
or 0
= start of the stream (the default); offset should be zero or positive
SEEK_CUR
or 1
= current stream position; offset may be negative
SEEK_END
or 2
= end of the stream; offset is usually negative
Galloping search (2.7+)
from collections import deque
from os import SEEK_CUR, SEEK_END
def readlast(f, d = b'\n'):
""""readlast(f: io.IOBase, d: bytes = b'\n') -> bytes
Return the last segment of file `f`, containing data segments separated by
`d`.
"""
arr = deque(); step = 1; pos = -1
try:
# Seek to last byte of file, save it to arr as to not check for newline.
pos = f.seek(-1, SEEK_END)
arr.appendleft(f.read())
# Seek past the byte read, plus one to use as the first segment.
pos = f.seek(-2, SEEK_END)
seg = f.read(1)
# Break when 'd' occurs, store index of the rightmost match in 'i'.
while seg.rfind(d) == -1:
# Store segments with no b'\n' in a memory-efficient 'deque'.
arr.appendleft(seg)
# Step back in file, past the bytes just read plus twice that.
pos = f.seek(-step*3, SEEK_CUR)
# Read new segment, twice as big as the one read previous iteration.
step *= 2
seg = f.read(step)
# Ignore the characters up to 'i', and the triggering newline character.
arr.appendleft(seg[seg.rfind(d)+1:])
except OSError:
# Reached beginning of file. Read remaining data and check for newline.
f.seek(0)
seg = f.read(pos)
arr.appendleft(seg[seg.rfind(d)+1:])
return b"".join(arr)
I'd probably go for a function that make use of an exponentially growing step size today and thus added such an example here, and will keep it alongside the the original answer (for now).
It handles edge cases well, apart from multibyte delimiters and files opened in text mode (see "Edge cases" for an example that handle those).
Usage:
f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'
f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'
Edge cases (2.7+)
I've refrained from editing the original answer as the question is specifically asks for efficiency, as well as to respect previous upvotes.
This version address all comments and issues raised over the years while preserving the logic and backward compatibility (at the cost of readability).
The issues raised and addressed at the point of writing is:
- Return empty string when parsing empty file, noted in comment by Loïc.
- Return all content when no delimiter is found, raised in comment by LazyLeopard.
- Avoid relative offsets to support text mode, raised in comment by AnotherParker.
- UTF16/UTF32 hack, noted in comment by Pietro Battiston.
Also supports multibyte delimiters.
from os import SEEK_CUR, SEEK_END
def _readlast__bytes(f, sep, size, step):
# Point cursor 'size' + 'step' bytes away from the end of the file.
o = f.seek(0 - size - step, SEEK_END)
# Step 'step' bytes each iteration, halt when 'sep' occurs.
while f.read(size) != sep:
f.seek(0 - size - step, SEEK_CUR)
def _readlast__text(f, sep, size, step):
# Text mode, same principle but without the use of relative offsets.
o = f.seek(0, SEEK_END)
o = f.seek(o - size - step)
while f.read(size) != sep:
o = f.seek(o - step)
def readlast(f, sep, fixed = False):
"""readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str
Return the last segment of file `f`, containing data segments separated by
`sep`.
Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
to pass the correct delimiter) in files opened in byte mode.
"""
size = len(sep)
step = len(sep) if (fixed is True) else (fixed or 1)
step = size if fixed else 1
if not size:
raise ValueError("Zero-length separator.")
try:
if 'b' in f.mode:
# Process file opened in byte mode.
_readlast__bytes(f, sep, size, step)
else:
# Process file opened in text mode.
_readlast__text(f, sep, size, step)
except (OSError, ValueError):
# Beginning of file reached.
f.seek(0, SEEK_SET)
return f.read()
Usage:
f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"
f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"
Efficiency
Code used to compare against this answer (optimised version of the most upvoted answer [at the point of posting]):
with open(file, "rb") as f:
first = f.readline() # Read and store the first line.
for last in f: pass # Read all lines, keep final value.
Results:
10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs 6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s
"1-2 millions lines each", as the question stated, would of course increase the difference a lot more.