3

I've a data file that looks like this:

58f0965a62d62099f5c0771d35dbc218        0.868632614612579       [0.028979932889342308, 0.004080114420503378, 0.03757167607545853]       [-0.006008833646774292, -0.010409083217382431, 0.01565541699528694]
36f7859ce47417470bc28384694f0ac4        0.835115909576416       [0.026130573824048042, -0.00358427781611681, 0.06635218113660812]       [-0.06970945745706558, 0.03816794604063034, 0.03491008281707764]
59f7d617bb662155b0d49ce3f27093ed        0.907200276851654       [0.009903069585561752, -0.009721670299768448, 0.0151780480518937]       [-0.03264783322811127, 0.0035394825972616673, -0.05089104175567627]

where the columns are respectively

  • an md5 hash of the data point
  • a target float output
  • an array of floats that I want to read into a np.array object
  • another array of floats that I want to read into a np.array object

I've been reading the file as such to create a numpy array files for the two matrices of array of floats:

import numpy as np
from tqdm import tqdm

import pandas as pd

lol = []
with open('data.tsv') as fin:
    for line in tqdm(fin):
        md5hash, score, vector1, vector2 = line.strip().split('\t')
        row = {'md5_hash': md5hash, 'score':float(score), 
               'vector1': np.array(eval(vector1)), 
               'vector2': np.array(eval(vector2))
              }
        lol.append(row)
        
df = pd.DataFrame(lol)

training_vector1 = np.array(list(df['vector1']))
# Save the training vectors.
np.save('vector1.npz', training_vector1)

training_vector2 = np.array(list(df['vector2']))
# Save the training vectors.
np.save('vector1.npz', training_vector2)

While this works for small dataset, the actual dataset has a lot more floats in the arrays and it's close to 200 million rows. Here's a sample of 100 rows https://gist.github.com/1f6f0b2501dc334db1e0038d36452f5d

How to efficiently read the array columns in the tsv file into a single npz files for each column efficiently?

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
alvas
  • 115,346
  • 109
  • 446
  • 738
  • What if I do `cut -f3 data.tsv`? Can I read that file easily with any numpy or pandas read functions? – alvas Jul 28 '22 at 04:30
  • 1
    Are you still looking for a solution? (other than the one you posted below) – SultanOrazbayev Aug 01 '22 at 11:31
  • 1
    Yes if there's anything more efficiently than the solution below. Note the 200M rows scale. – alvas Aug 02 '22 at 13:57
  • Are you running out of memory, or is it a performance problem? Do you know the exact number of rows beforehand? Since a npz file is just a zipfile, sequential read->write is easily possible, but this won't help with performance. – max9111 Aug 02 '22 at 14:45
  • I don't have the exact number of rows beforehand but if it's needed, I can run a count before that. – alvas Aug 02 '22 at 15:45

4 Answers4

3

First, a note on the overall problem. Any approach that loads 200M rows similar to the sample input you provided would require some 1.1 TB of memory. While this is possible, it is certainly not ideal. Therefore, I would not recommend going forward with this, but rather look into approaches specifically designed for handling large dataset, e.g. HDF5.

Having said that, the problem at hand is not particular complex, but passing through pandas and eval() is probably neither desirable nor beneficial.

The same could be said for cut pre-processing into CSV files that are only marginally simpler to read.

Assuming that np.save() will be equally fast, regardless of how the array is produced, we could say that the following function replicates well the processing in OP:

def process_tsv_OP(filepath="100-translation.embedded-3.tsv"):  
    lol = []
    with open(filepath, "r") as fin:
        for line in fin:
            md5hash, score, vector1, vector2 = line.strip().split('\t')
            row = {'md5_hash': md5hash, 'score':float(score), 
                'vector1': np.array(eval(vector1)), 
                'vector2': np.array(eval(vector2))
                }
            lol.append(row)
    df = pd.DataFrame(lol)
    training_vector1 = np.array(list(df['vector1']))
    training_vector2 = np.array(list(df['vector2']))
    return training_vector1, training_vector2

This can be simplified by avoiding pandas and "evil-eval()" (and a number of copying around in memory):

def text2row(text):
    text = text[1:-1]
    return [float(x) for x in text.split(',')]


def process_tsv(filepath="100-translation.embedded-3.tsv"):
    with open(filepath, "r") as in_file:
        v1 = []
        v2 = []
        for line in in_file:
            _, _, text_r1, text_r2 = line.strip().split('\t')
            r1 = text2row(text_r1)
            r2 = text2row(text_r2)
            v1.append(r1)
            v2.append(r2)
    v1 = np.array(v1)
    v2 = np.array(v2)
    return v1, v2

It is easy to show that the two produce the same output:

def same_res(x, y):
    return all(np.allclose(i, j) for i, j in zip(x, y))


same_res(process_tsv(), process_tsv_OP())
# True

but with substantially different timings:

%timeit process_tsv_OP()
# 1 loop, best of 5: 300 ms per loop
%timeit process_tsv()
# 10 loops, best of 5: 86.1 ms per loop

(on the sample input file obtained with: wget https://gist.githubusercontent.com/alvations/1f6f0b2501dc334db1e0038d36452f5d/raw/ee31c052a4dbda131df182f0237dbe6e5197dff2/100-translation.embedded-3.tsv)


Preprocessing the input with cut does not seem to be that beneficial:

!time cut -f3 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv
# real  0m0.184s
# user  0m0.102s
# sys   0m0.233s
!time cut -f4 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv
# real  0m0.208s
# user  0m0.113s
# sys   0m0.279s
%timeit np.genfromtxt('vector1.csv', delimiter=','); np.genfromtxt('vector2.csv', delimiter=',')
# 1 loop, best of 5: 130 ms per loop

and, while some time may be saved by using pd.read_csv():

%timeit pd.read_csv('vector1.csv').to_numpy(); pd.read_csv('vector2.csv').to_numpy()
# 10 loops, best of 5: 85.7 ms per loop

this seems to be even slower than the original approach on the provided dataset (although cut itself may scale better for larger inputs).


If you really want to stick to the npy file format for this, you may at least wish to append to your output in blocks. While this is not supported well with NumPy alone, you could use NpyAppendArray (see also here). The modified process_tsv() would look like:

import os
from npy_append_array import NpyAppendArray


def process_tsv_append(
    in_filepath="100-translation.embedded-3.tsv",
    out1_filepath="out1.npy",
    out2_filepath="out2.npy",
    append_every=10,
):
    # clear output files
    for filepath in (out1_filepath, out2_filepath):
        if os.path.isfile(filepath):
            os.remove(filepath)
    with \
            open(in_filepath, "r") as in_file, \
            NpyAppendArray(out1_filepath) as npaa1, \
            NpyAppendArray(out2_filepath) as npaa2:
        v1 = []
        v2 = []
        for i, line in enumerate(in_file, 1):
            _, _, text_r1, text_r2 = line.strip().split("\t")
            r1 = text2row(text_r1)
            r2 = text2row(text_r2)
            v1.append(r1)
            v2.append(r2)
            if i % append_every == 0:
                npaa1.append(np.array(v1))
                npaa2.append(np.array(v2))
                v1 = []
                v2 = []
        if len(v1) > 0:  # assumes len(v1) == len(v2)
            npaa1.append(np.array(v1))
            npaa2.append(np.array(v2))


process_tsv_append()

v1 = np.load("out1.npy")
v2 = np.load("out2.npy")
same_res(process_tsv(), (v1, v2))
# True

All this can be speed up relatively blindly with Cython, but the speed-up seems to be marginal:

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True


import numpy as np


cpdef text2row_cy(text):
    return [float(x) for x in text[1:-1].split(',')]


cpdef process_tsv_cy(filepath="100-translation.embedded-3.tsv"):
    with open(filepath, "r") as in_file:
        v1 = []
        v2 = []
        for line in in_file:
            _, _, text_r1, text_r2 = line.strip().split('\t')
            r1 = text2row_cy(text_r1)
            r2 = text2row_cy(text_r2)
            v1.append(r1)
            v2.append(r2)
    v1 = np.array(v1)
    v2 = np.array(v2)
    return v1, v2
print(same_res(process_tsv_cy(), process_tsv_OP()))
# True
%timeit process_tsv_cy()
# 10 loops, best of 5: 72.4 ms per loop

Similarly, pre-allocating the arrays does not seem to be beneficial:

def text2row_out(text, out):
    for i, x in enumerate(text[1:-1].split(',')):
        out[i] = float(x)


def process_tsv_alloc(filepath="100-translation.embedded-3.tsv"):
    num_lines = open(filepath, "r").read().count("\n")
    with open(filepath, "r") as in_file:
        # num lines
        num_lines = in_file.read().count("\n")
        # num cols
        in_file.seek(0)
        line = next(in_file)
        _, _, text_r1, text_r2 = line.strip().split('\t')
        num_cols1 = len(text_r1.split(","))
        num_cols2 = len(text_r2.split(","))
        # populate arrays
        v1 = np.empty((num_lines, num_cols1))
        v2 = np.empty((num_lines, num_cols2))
        in_file.seek(0)
        for i, line in enumerate(in_file):
            _, _, text_r1, text_r2 = line.strip().split('\t')
            text2row_out(text_r1, v1[i])
            text2row_out(text_r2, v2[i])
    return v1, v2


print(same_res(process_tsv_alloc(), process_tsv_OP()))
%timeit process_tsv_alloc()
# 10 loops, best of 5: 110 ms per loop

A significant reduction in the running time can be obtained with Numba (and possibly with Cython too) by rewriting everything to be closer to C. In order to make our code compatible with -- and beneficial to have it accelerated by -- Numba, we need to make significant modifications:

  • open the file as bytes (no longer supporting UTF-8, which is not a significant issue for the problem at hand)
  • read and process the file in blocks, which needs to be sufficiently large, say in the order of 1M
  • write all string handling functions by hand, notably the string-to-float conversion
import numpy as np
import numba as nb


@nb.njit
def bytes2int(text):
    c_min = ord("0")
    c_max = ord("9")

    n = len(text)
    valid = n > 0
    # determine sign
    start = n - 1
    stop = -1
    sign = 1
    if valid:
        first = text[0]
        if first == ord("+"):
            stop = 0
        elif first == ord("-"):
            sign = -1
            stop = 0
    # parse rest
    number = 0
    j = 0
    for i in range(start, stop, -1):
        c = text[i]
        if c_min <= c <= c_max:
            number += (c - c_min) * 10 ** j
            j += 1
        else:
            valid = False
            break
    return sign * number if valid else None


@nb.njit
def bytes2float_helper(text):
    sep = ord(".")
    c_min = ord("0")
    c_max = ord("9")

    n = len(text)
    valid = n > 0
    # determine sign
    start = n - 1
    stop = -1
    sign = 1
    if valid:
        first = text[0]
        if first == ord("+"):
            stop = 0
        elif first == ord("-"):
            sign = -1
            stop = 0
    # parse rest
    sep_pos = 0
    number = 0
    j = 0
    for i in range(start, stop, -1):
        c = text[i]
        if c_min <= c <= c_max:
            number += (c - c_min) * 10 ** j
            j += 1
        elif c == sep and sep_pos == 0:
            sep_pos = j
        else:
            valid = False
            break
    return sign * number, sep_pos, valid


@nb.njit
def bytes2float(text):
    exp_chars = b"eE"
    exp_pos = -1
    for exp_char in exp_chars:
        for i, c in enumerate(text[::-1]):
            if c == exp_char:
                exp_pos = i
                break
        if exp_pos > -1:
            break
    if exp_pos > 0:
        exp_number = bytes2int(text[-exp_pos:])
        if exp_number is None:
            exp_number = 0
        number, sep_pos, valid = bytes2float_helper(text[:-exp_pos-1])
        result = number / 10.0 ** (sep_pos - exp_number) if valid else None
    else:
        number, sep_pos, valid = bytes2float_helper(text)
        result = number / 10.0 ** sep_pos if valid else None
    return result


@nb.njit
def btrim(text):
    space = ord(" ")
    tab = ord("\t")
    nl = ord("\n")
    cr = ord("\r")
    start = 0
    stop = 0
    for c in text:
        if c == space or c == tab or c == nl or c == cr:
            start += 1
        else:
            break
    for c in text[::-1]:
        if c == space:
            stop += 1
        else:
            break
    if start == 0 and stop == 0:
        return text
    elif stop == 0:
        return text[start:]
    else:
        return text[start:-stop]


@nb.njit
def text2row_nb(text, sep, num_cols, out, curr_row):
    last_i = 0
    j = 0
    for i, c in enumerate(text):
        if c == sep:
            x = bytes2float(btrim(text[last_i:i]))
            out[curr_row, j] = x
            last_i = i + 2
            j += 1
    x = bytes2float(btrim(text[last_i:]))
    out[curr_row, j] = x


@nb.njit
def process_line(line, psep, sep, num_psep, num_cols1, num_cols2, out1, out2, curr_row):
    if len(line) > 0:
        psep_pos = np.empty(num_psep, dtype=np.int_)
        j = 0
        for i, char in enumerate(line):
            if char == psep:
                psep_pos[j] = i
                j += 1
        text2row_nb(line[psep_pos[-2] + 2:psep_pos[-1] - 1], sep, num_cols1, out1, curr_row)
        text2row_nb(line[psep_pos[-1] + 2:-1], sep, num_cols2, out2, curr_row)


@nb.njit
def decode_block(block, psep, sep, num_lines, num_cols1, num_cols2, out1, out2, curr_row):
    nl = ord("\n")
    last_i = 0
    i = j = 0
    for c in block:
        if c == nl:
            process_line(block[last_i:i], psep, sep, 3, num_cols1, num_cols2, out1, out2, curr_row)
            j += 1
            last_i = i
            curr_row += 1
        if j >= num_lines:
            break
        i += 1
    return block[i + 1:], curr_row


@nb.njit
def count_nl(block, start=0):
    nl = ord("\n")
    for c in block:
        if c == nl:
            start += 1
    return start


def process_tsv_block(filepath="100-translation.embedded-3.tsv", size=2 ** 18):
    with open(filepath, "rb") as in_file:
        # count newlines
        num_lines = 0
        while True:
            block = in_file.read(size)
            if block:
                num_lines = count_nl(block, num_lines)
            else:
                break

        # count num columns
        in_file.seek(0)
        line = next(in_file)
        _, _, text_r1, text_r2 = line.strip().split(b'\t')
        num_cols1 = len(text_r1.split(b","))
        num_cols2 = len(text_r2.split(b","))
        
        # fill output arrays
        v1 = np.empty((num_lines, num_cols1))
        v2 = np.empty((num_lines, num_cols2))
        in_file.seek(0)
        remainder = b""
        curr_row = 0
        while True:
            block = in_file.read(size)
            if block:
                block = remainder + block
                num_lines = count_nl(block)
                if num_lines > 0:
                    remainder, curr_row = decode_block(block, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
                else:
                    remainder = block
            else:
                num_lines = count_nl(remainder)
                if num_lines > 0:
                    remainder, curr_row = decode_block(remainder, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
                break
    return v1, v2

The prize for all this work is a mere ~2x speed up over process_tsv():

print(same_res(process_tsv_block(), process_tsv_OP()))
# True
%timeit process_tsv_block()
# 10 loops, best of 5: 48.8 ms per loop
norok2
  • 25,683
  • 4
  • 73
  • 99
2

Cut the 3rd column, remove the first and last square brackets

cut -f3 data.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv

Repeat the same for Vector 2

cut -f4 data.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv

Read the csv into numpy in Python save to npy file.

import numpy as np

np.save('vector1.npy', np.genfromtxt('vector1.csv', delimiter=','))
np.save('vector1.npy', np.genfromtxt('vector2.csv', delimiter=','))
alvas
  • 115,346
  • 109
  • 446
  • 738
2

The other answers are good, the version below is a variation that uses dask. Since the original data is in text format, let's use dask.bag API.

First, import modules and define a utility function:

from dask.array import from_delayed, from_npy_stack, to_npy_stack, vstack
from dask.bag import read_text
from numpy import array, nan, stack

def process_line(line):
    """Utility function adapted from the snippet in the question."""
    md5hash, score, vector1, vector2 = line.strip().split("\t")
    row = {
        "md5_hash": md5hash,
        "score": float(score),
        "vector1": array(eval(vector1)),
        "vector2": array(eval(vector2)),
    }
    return row

Next, create a bag:

bag = read_text("100-translation.embedded-3.tsv", blocksize="1mb").map(process_line)

Since the sample snippet is small, to simulate 'big data', let's pretend that we can load '1mb' at once. This should create 3 partitions in the bag.

Next, isolate the vectors/arrays and convert them to dask.arrays:

# create delayed versions of the arrays
a1 = bag.pluck("vector1").map_partitions(stack).to_delayed()
a2 = bag.pluck("vector2").map_partitions(stack).to_delayed()

# convert the delayed objects to dask array
A1 = vstack(
    [from_delayed(a, shape=(nan, 768), dtype="float") for a in a1],
    allow_unknown_chunksizes=True,
)
A2 = vstack(
    [from_delayed(a, shape=(nan, 768), dtype="float") for a in a2],
    allow_unknown_chunksizes=True,
)

Now, we can save the arrays as npy stacks:

to_npy_stack("_A1", A1)
to_npy_stack("_A2", A2)

Note that this processing is not ideal, since the workers will pass over the data twice (once for each array), but with the current API I couldn't think of a better way.

Furthermore, note that the npy stacks preserve the 'unknown' chunks as metadata, even though all the relevant information was computed. This is something that could be improved in dask codebase, but for now the easiest fix is to load the data again, compute chunks, rechunk (to get nice, grid-like structure) and save again:

# rechunk into regular-sized format
A1 = from_npy_stack("_A1")
A1.compute_chunk_sizes()
A1.rechunk(chunks=(40, 768))
to_npy_stack("A1_final", A1)

# rechunk into regular-sized format
A2 = from_npy_stack("_A2")
A2.compute_chunk_sizes()
A2.rechunk(chunks=(40, 768))
to_npy_stack("A2_final", A2)

Of course on the real dataset, you'd want to use bigger chunks. And the final save operation does not have to be to numpy stacks, depending on your interest this could now be stored as HDF5 or zarr array.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
1

If the output format is changed to a raw binary file then the input file can be processed line by line without storing the complete result in RAM.

import numpy as np

fh_in = open('data.tsv')
fh_vec1 = open('vector1.bin', 'wb')
fh_vec2 = open('vector2.bin', 'wb')

linecount = 0
for line in fh_in:
    hash_, score, vec1, vec2 = line.strip().split('\t')
    np.fromstring(vec1.strip('[]'), sep=',').tofile(fh_vec1)
    np.fromstring(vec2.strip('[]'), sep=',').tofile(fh_vec2)
    linecount += 1

A raw binary file doesn't store any info about dtype, shape, or byte order. For loading it back into an array you can use np.fromfile or np.memmap and then call .reshape(linecount, -1) on it.

user7138814
  • 1,991
  • 9
  • 11
  • 1
    This should probably either include some closing of the files or be rewritten using context managers (i.e. `with ...`). The risk is some non-finalized flushing which could corrupt the end of the `vector1.bin` and `vector2.bin` files. – norok2 Aug 05 '22 at 10:06