First, a note on the overall problem.
Any approach that loads 200M rows similar to the sample input you provided would require some 1.1 TB of memory.
While this is possible, it is certainly not ideal.
Therefore, I would not recommend going forward with this, but rather look into approaches specifically designed for handling large dataset, e.g. HDF5.
Having said that, the problem at hand is not particular complex, but passing through pandas
and eval()
is probably neither desirable nor beneficial.
The same could be said for cut
pre-processing into CSV files that are only marginally simpler to read.
Assuming that np.save()
will be equally fast, regardless of how the array is produced, we could say that the following function replicates well the processing in OP:
def process_tsv_OP(filepath="100-translation.embedded-3.tsv"):
lol = []
with open(filepath, "r") as fin:
for line in fin:
md5hash, score, vector1, vector2 = line.strip().split('\t')
row = {'md5_hash': md5hash, 'score':float(score),
'vector1': np.array(eval(vector1)),
'vector2': np.array(eval(vector2))
}
lol.append(row)
df = pd.DataFrame(lol)
training_vector1 = np.array(list(df['vector1']))
training_vector2 = np.array(list(df['vector2']))
return training_vector1, training_vector2
This can be simplified by avoiding pandas
and "evil-eval()
" (and a number of copying around in memory):
def text2row(text):
text = text[1:-1]
return [float(x) for x in text.split(',')]
def process_tsv(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('\t')
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2
It is easy to show that the two produce the same output:
def same_res(x, y):
return all(np.allclose(i, j) for i, j in zip(x, y))
same_res(process_tsv(), process_tsv_OP())
# True
but with substantially different timings:
%timeit process_tsv_OP()
# 1 loop, best of 5: 300 ms per loop
%timeit process_tsv()
# 10 loops, best of 5: 86.1 ms per loop
(on the sample input file obtained with: wget https://gist.githubusercontent.com/alvations/1f6f0b2501dc334db1e0038d36452f5d/raw/ee31c052a4dbda131df182f0237dbe6e5197dff2/100-translation.embedded-3.tsv
)
Preprocessing the input with cut
does not seem to be that beneficial:
!time cut -f3 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv
# real 0m0.184s
# user 0m0.102s
# sys 0m0.233s
!time cut -f4 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv
# real 0m0.208s
# user 0m0.113s
# sys 0m0.279s
%timeit np.genfromtxt('vector1.csv', delimiter=','); np.genfromtxt('vector2.csv', delimiter=',')
# 1 loop, best of 5: 130 ms per loop
and, while some time may be saved by using pd.read_csv()
:
%timeit pd.read_csv('vector1.csv').to_numpy(); pd.read_csv('vector2.csv').to_numpy()
# 10 loops, best of 5: 85.7 ms per loop
this seems to be even slower than the original approach on the provided dataset (although cut
itself may scale better for larger inputs).
If you really want to stick to the npy
file format for this, you may at least wish to append to your output in blocks.
While this is not supported well with NumPy alone, you could use NpyAppendArray
(see also here).
The modified process_tsv()
would look like:
import os
from npy_append_array import NpyAppendArray
def process_tsv_append(
in_filepath="100-translation.embedded-3.tsv",
out1_filepath="out1.npy",
out2_filepath="out2.npy",
append_every=10,
):
# clear output files
for filepath in (out1_filepath, out2_filepath):
if os.path.isfile(filepath):
os.remove(filepath)
with \
open(in_filepath, "r") as in_file, \
NpyAppendArray(out1_filepath) as npaa1, \
NpyAppendArray(out2_filepath) as npaa2:
v1 = []
v2 = []
for i, line in enumerate(in_file, 1):
_, _, text_r1, text_r2 = line.strip().split("\t")
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
if i % append_every == 0:
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))
v1 = []
v2 = []
if len(v1) > 0: # assumes len(v1) == len(v2)
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))
process_tsv_append()
v1 = np.load("out1.npy")
v2 = np.load("out2.npy")
same_res(process_tsv(), (v1, v2))
# True
All this can be speed up relatively blindly with Cython, but the speed-up seems to be marginal:
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import numpy as np
cpdef text2row_cy(text):
return [float(x) for x in text[1:-1].split(',')]
cpdef process_tsv_cy(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('\t')
r1 = text2row_cy(text_r1)
r2 = text2row_cy(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2
print(same_res(process_tsv_cy(), process_tsv_OP()))
# True
%timeit process_tsv_cy()
# 10 loops, best of 5: 72.4 ms per loop
Similarly, pre-allocating the arrays does not seem to be beneficial:
def text2row_out(text, out):
for i, x in enumerate(text[1:-1].split(',')):
out[i] = float(x)
def process_tsv_alloc(filepath="100-translation.embedded-3.tsv"):
num_lines = open(filepath, "r").read().count("\n")
with open(filepath, "r") as in_file:
# num lines
num_lines = in_file.read().count("\n")
# num cols
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split('\t')
num_cols1 = len(text_r1.split(","))
num_cols2 = len(text_r2.split(","))
# populate arrays
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
for i, line in enumerate(in_file):
_, _, text_r1, text_r2 = line.strip().split('\t')
text2row_out(text_r1, v1[i])
text2row_out(text_r2, v2[i])
return v1, v2
print(same_res(process_tsv_alloc(), process_tsv_OP()))
%timeit process_tsv_alloc()
# 10 loops, best of 5: 110 ms per loop
A significant reduction in the running time can be obtained with Numba (and possibly with Cython too) by rewriting everything to be closer to C. In order to make our code compatible with -- and beneficial to have it accelerated by -- Numba, we need to make significant modifications:
- open the file as bytes (no longer supporting UTF-8, which is not a significant issue for the problem at hand)
- read and process the file in blocks, which needs to be sufficiently large, say in the order of 1M
- write all string handling functions by hand, notably the string-to-float conversion
import numpy as np
import numba as nb
@nb.njit
def bytes2int(text):
c_min = ord("0")
c_max = ord("9")
n = len(text)
valid = n > 0
# determine sign
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
# parse rest
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
else:
valid = False
break
return sign * number if valid else None
@nb.njit
def bytes2float_helper(text):
sep = ord(".")
c_min = ord("0")
c_max = ord("9")
n = len(text)
valid = n > 0
# determine sign
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
# parse rest
sep_pos = 0
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
elif c == sep and sep_pos == 0:
sep_pos = j
else:
valid = False
break
return sign * number, sep_pos, valid
@nb.njit
def bytes2float(text):
exp_chars = b"eE"
exp_pos = -1
for exp_char in exp_chars:
for i, c in enumerate(text[::-1]):
if c == exp_char:
exp_pos = i
break
if exp_pos > -1:
break
if exp_pos > 0:
exp_number = bytes2int(text[-exp_pos:])
if exp_number is None:
exp_number = 0
number, sep_pos, valid = bytes2float_helper(text[:-exp_pos-1])
result = number / 10.0 ** (sep_pos - exp_number) if valid else None
else:
number, sep_pos, valid = bytes2float_helper(text)
result = number / 10.0 ** sep_pos if valid else None
return result
@nb.njit
def btrim(text):
space = ord(" ")
tab = ord("\t")
nl = ord("\n")
cr = ord("\r")
start = 0
stop = 0
for c in text:
if c == space or c == tab or c == nl or c == cr:
start += 1
else:
break
for c in text[::-1]:
if c == space:
stop += 1
else:
break
if start == 0 and stop == 0:
return text
elif stop == 0:
return text[start:]
else:
return text[start:-stop]
@nb.njit
def text2row_nb(text, sep, num_cols, out, curr_row):
last_i = 0
j = 0
for i, c in enumerate(text):
if c == sep:
x = bytes2float(btrim(text[last_i:i]))
out[curr_row, j] = x
last_i = i + 2
j += 1
x = bytes2float(btrim(text[last_i:]))
out[curr_row, j] = x
@nb.njit
def process_line(line, psep, sep, num_psep, num_cols1, num_cols2, out1, out2, curr_row):
if len(line) > 0:
psep_pos = np.empty(num_psep, dtype=np.int_)
j = 0
for i, char in enumerate(line):
if char == psep:
psep_pos[j] = i
j += 1
text2row_nb(line[psep_pos[-2] + 2:psep_pos[-1] - 1], sep, num_cols1, out1, curr_row)
text2row_nb(line[psep_pos[-1] + 2:-1], sep, num_cols2, out2, curr_row)
@nb.njit
def decode_block(block, psep, sep, num_lines, num_cols1, num_cols2, out1, out2, curr_row):
nl = ord("\n")
last_i = 0
i = j = 0
for c in block:
if c == nl:
process_line(block[last_i:i], psep, sep, 3, num_cols1, num_cols2, out1, out2, curr_row)
j += 1
last_i = i
curr_row += 1
if j >= num_lines:
break
i += 1
return block[i + 1:], curr_row
@nb.njit
def count_nl(block, start=0):
nl = ord("\n")
for c in block:
if c == nl:
start += 1
return start
def process_tsv_block(filepath="100-translation.embedded-3.tsv", size=2 ** 18):
with open(filepath, "rb") as in_file:
# count newlines
num_lines = 0
while True:
block = in_file.read(size)
if block:
num_lines = count_nl(block, num_lines)
else:
break
# count num columns
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split(b'\t')
num_cols1 = len(text_r1.split(b","))
num_cols2 = len(text_r2.split(b","))
# fill output arrays
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
remainder = b""
curr_row = 0
while True:
block = in_file.read(size)
if block:
block = remainder + block
num_lines = count_nl(block)
if num_lines > 0:
remainder, curr_row = decode_block(block, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
else:
remainder = block
else:
num_lines = count_nl(remainder)
if num_lines > 0:
remainder, curr_row = decode_block(remainder, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
break
return v1, v2
The prize for all this work is a mere ~2x speed up over process_tsv()
:
print(same_res(process_tsv_block(), process_tsv_OP()))
# True
%timeit process_tsv_block()
# 10 loops, best of 5: 48.8 ms per loop