A bit late to the party, but let's see what I can contribute to.
First, a disclosure: my favorite is @JohnHennig's double-.replace()
method, because it is reasonably fast and it is crystal clear what is happening.
I think there are no other simple and fast solutions within standard Python beyond what was already proposed in the other answers (some of which I slightly modified to obtain the exact same result as the double-.replace()
).
However, it may be possible to speed things up. Here I propose 3 additional solutions: two use Cython and one uses Numba.
For simplicity, I wrote this with IPython using the Cython magic.
%load_ext Cython
The core idea is that it is sufficient to loop through the input only once as long as we copy the data to another string on the go.
Coding this straight away in Python is simple, but to make it feasible we need to use bytearray()
to overcome the immutability of str
/ bytes
.
The slow loop can be compiled for speed with Cython (unl_loop_cy()
).
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
def unl_loop_cy(b):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
n = len(b)
result = bytearray(n)
i = j = 0
while i + 1 <= n:
if b[i] == nl_cr:
result[j] = nl_lf
i += 2 if b[i + 1] == nl_lf else 1
else:
result[j] = b[i]
i += 1
j += 1
return bytes(result[:j])
However, neither bytes
nor bytearray
is compatible with Numba.
To use it, we need to go through NumPy, which offers means of dealing with bytes
efficiently: np.frombuffer()
and np.ndarray.tobytes()
.
The base algorithm stays the same and the code now reads:
import numpy as np
import numba as nb
@nb.jit
def _unl_loop_nb(b, result):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
n = len(b)
i = j = 0
while i + 1 <= n:
if b[i] == nl_cr:
result[j] = nl_lf
i += 2 if b[i + 1] == nl_lf else 1
else:
result[j] = b[i]
i += 1
j += 1
return j
def unl_loop_nb(b):
arr = np.frombuffer(b, np.uint8)
result = np.empty(arr.shape, np.uint8)
size = _unl_loop_nb(arr, result)
return result[:size].tobytes()
With newer version of Numba adding support to bytes
and np.empty()
, one could write an improved version of the above:
import numpy as np
import numba as nb
@nb.jit
def _unl_loop_nb2(b):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
n = len(b)
result = np.empty(n, dtype=np.uint8)
i = j = 0
while i + 1 <= n:
if b[i] == nl_cr:
result[j] = nl_lf
i += 2 if b[i + 1] == nl_lf else 1
else:
result[j] = b[i]
i += 1
j += 1
return result[:j]
def unl_loop_nb2(b):
return _unl_loop_nb2(b).tobytes()
Finally, we can optimize the Cython solution further, in order to get additional speed. To do this, we replace bytearray
with actual C++ strings and push as much computation as possible "outside of Python".
%%cython --cplus -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
from libcpp.string cimport string
cdef extern from *:
"""
#include <string>
std::string & erase(
std::string & s,
std::size_t pos,
std::size_t len) {
return s.erase(pos, len); }
"""
string& erase(string& s, size_t pos, size_t len)
cpdef string _unl_cppstr_cy(string s):
cdef char nl_lf = b'\n'
cdef char nl_cr = b'\r'
cdef char null = b'\0'
cdef size_t s_size = s.size()
cdef string result = string(s_size, null)
cdef size_t i = 0
cdef size_t j = 0
while i + 1 <= s_size:
if s[i] == nl_cr:
result[j] = nl_lf
if s[i + 1] == nl_lf:
i += 1
else:
result[j] = s[i]
j += 1
i += 1
return erase(result, j, i - j)
def unl_cppstr_cy(b):
return _unl_cppstr_cy(b)
The C++-optimized solution and the Numba-accelerated approaches comes out with quite competitive timings, and they outperform the double-.replace()
method (for sufficiently large inputs for the case of Numba).
For somewhat smaller inputs, the C++-optimized approach comes out as the fastest, but for large enough inputs, the Numba-based approaches (and particularly the second one) gets even faster.
The Cython-accelerated bytearray
approach, comes out to be the slowest in the considered benchmarks. Yet it is noteworthly competitive with the other solution despite the explicit looping (because of the Cython compilation).
The benchmarks come out as follows:

and, zooming on the fastest method:

For completeness, here are the other tested functions:
def unl_replace(s):
return s.replace(b'\r\n', b'\n').replace(b'\r', b'\n')
# EDIT: was originally the commented code, but it is less efficient
# def unl_join(s):
# nls = b'\r\n', b'\r', b'\n'
# return b'\n'.join(s.splitlines()) + (
# b'\n' if any(s.endswith(nl) for nl in nls) else b'')
def unl_join(s):
result = b'\n'.join(s.splitlines())
nls = b'\r\n', b'\r', b'\n'
if any(s.endswith(nl) for nl in nls):
result += b'\n'
return result
# Following @VPfB suggestion
def unl_join_new(s):
return b'\n'.join((s + b'\0').splitlines())[:-1]
import re
def unl_re(s, match=re.compile(b'\r\n?')):
return match.sub(b'\n', s)
def unl_join_naive(s): # NOTE: not same result as `unl_replace()`
return b'\n'.join(s.splitlines())
and this is the function used to generate the inputs:
def gen_input(num, nl_factor=0.10):
nls = b'\r\n', b'\r', b'\n'
words = (b'a', b'b', b' ')
random.seed(0)
nl_percent = int(100 * nl_factor)
base = words * (100 - nl_percent) + nls * nl_percent
return b''.join([base[random.randint(0, len(base) - 1)] for _ in range(num)])
and the scripts for generating the data and the plots (originally based on this) are available here.
Note
I have also tested a couple of other possible implementations with explicit looping, but I have omitted them from the comparisons because they were orders of magnitude slower than the proposed solutions (and resulted in slower timing even after compiling with Cython), but I report them here for future reference:
def unl_loop(b):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
n = len(b)
result = bytearray(n)
i = j = 0
while i + 1 <= n:
if b[i] == nl_cr:
result[j] = nl_lf
i += 2 if b[i + 1] == nl_lf else 1
else:
result[j] = b[i]
i += 1
j += 1
return bytes(result[:j])
def unl_loop_add(b):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
result = b''
i = 0
while i + 1 <= len(b):
if b[i] == nl_cr:
result += b'\n'
i += 2 if b[i + 1] == nl_lf else 1
else:
result += b[i:i + 1]
i += 1
return result
def unl_loop_append(b):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
result = bytearray()
i = 0
while i + 1 <= len(b):
if b[i] == nl_cr:
result.append(nl_lf)
i += 2 if b[i + 1] == nl_lf else 1
else:
result.append(b[i])
i += 1
return bytes(result)
def unl_loop_del(b):
nl_cr = b'\r'[0]
nl_lf = b'\n'[0]
b = bytearray(b)
i = 0
while i + 1 <= len(b):
if b[i] == nl_cr:
if b[i + 1] == nl_lf:
del b[i]
else:
b[i] = nl_lf
i += 1
return bytes(b)
(EDIT: comments on assumptions / potential issues)
Assumptions / Potential Issues
For "mixed newlines" files, e.g. b'alpha\nbravo\r\ncharlie\rdelta'
, there will always be a theoretical ambiguity of whether \r\n
is to be considered as 1 or 2 newlines.
All the methods implemented above will have the same behavior and consider \r\n
as a single newline.
Additionally, all these methods will have issues with \r
and/or \r\n
spurious presence with complex encodings, for example, taking from @JohnHennig comments, the Malayalam letter ഊ encodes to b'\r\n' in UTF-16
and bytes.splitlines()
seems NOT to be aware of it, and all the tested method seems to behave equally:
s = 'ഊ\n'.encode('utf-16')
print(s)
# b'\xff\xfe\n\r\n\x00'
s.splitlines()
[b'\xff\xfe', b'', b'\x00']
for func in funcs:
print(func(s))
# b'\xff\xfe\n\n\x00'
# b'\xff\xfe\n\n\x00'
# b'\xff\xfe\n\n\x00'
# b'\xff\xfe\n\n\x00'
# b'\xff\xfe\n\n\x00'
# b'\xff\xfe\n\n\x00'
Finally, unl_join_naive()
relies only on the Python implementation line splitting, which means it is a bit less obvious what would happen, but it may get better support for these kind of issues in the future.
This method is also dropping the last newline if it is at the end of the string, so some extra code (which would add a -- typically small -- constant offset in the timing) is required to overcome this behavior. A few suggestion to solve the issue include:
- checking the last chars for the presence of newline markers at the end (which is not an issue now given the current
bytes.splitlines()
implementation, but could be an issue in the future if a spurious \r\n
happen to be last char and bytes.splitlines()
behavior get sensitive to that) as in unl_join()
;
- adding any non-newline ASCII 7-bit char (e.g.
\0
) to the original input and remove the last element after the join()
(which seems safer and faster than the previous one) as in unl_join_new()
.
(EDITED: To add a simpler Cython-solution, a Numba-based solution and update the timings).
(EDITED: To add another Numba-based solution, requiring bytes
and np.empty()
support, and update the timings).