8

I have 2 text files (*.txt) that contain unique strings in the format:

udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi

The first file contains 50 million such lines (4.3 GB), and the second contains 1 million lines (112 MB). One line contains 40 characters, delimiter : and 45 more characters.

Task: get unique values for both files. That is, you need a csv or txt file with lines that are in the second file and which are not in the first.

I am trying to do this using vaex (Vaex):

import vaex

base_files = ['file1.txt']
for i, txt_file in enumerate(base_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')

check_files = ['file2.txt']
for i, txt_file in enumerate(check_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')

dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base, on='data', how='inner', inplace=True)
dv_result.export(path='result.csv')

As a result, I get the result.csv file with unique row values. But the verification process takes a very long time. In addition, it uses all available RAM and all processor resources. How can this process be accelerated? What am I doing wrong? What can be done better? Is it worth using other libraries (pandas, dask) for this check and will they be faster?


UPD 10.11.2020 So far, I have not found anything faster than the following option:

from io import StringIO


def read_lines(filename):
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')


def read_in_chunks(file_obj, chunk_size=10485760):
    while True:
        data = file_obj.read(chunk_size)
        if not data:
            break
        yield data


file_check = open('check.txt', 'r', errors='ignore').read()

check_set = {elem for elem in read_lines(file_check)}

with open(file='base.txt', mode='r', errors='ignore') as file_base:
    for idx, chunk in enumerate(read_in_chunks(file_base), 1):
        print(f'Checked [{idx}0 Mb]')
        for elem in read_lines(chunk):
            if elem in check_set:
                check_set.remove(elem)

print(f'Unique rows: [{len(check_set)}]')

UPD 11.11.2020: Thanks @m9_psy for the tips to improve performance. It's really faster! Currently, the fastest way is:

from io import BytesIO

check_set = {elem for elem in BytesIO(open('check.txt', 'rb').read())}

with open('base.txt', 'rb') as file_base:
    for line in file_base:
        if line in check_set:
            check_set.remove(line)

print(f'Unique rows: [{len(check_set)}]')

Is there a way to further speed up this process?

  • Could you elaborate a bit on which process takes a very long time? Is it the conversion from csv to hdf5? Of is it the join that is slow and uses memory? – Maarten Breddels Nov 05 '20 at 09:02
  • @Maarten Breddels Conversion to hdf5 is fast. Checking strings is slow. – Владимир Nov 06 '20 at 06:57
  • By checking you mean the join? – Maarten Breddels Nov 06 '20 at 09:44
  • @Maarten Breddels Not really. I just need to get the lines that are in the second file and that are not in the first file. The second file is always smaller than the first. – Владимир Nov 06 '20 at 09:50
  • 2
    Have you considered the alternative of simply using `awk` on the command line? If I understood your requirements correctly (i.e. return only lines which are present in file2.txt, but not already in file1.txt), [this answer](https://stackoverflow.com/a/4717415/565489) should do the job just fine. Note that you need to pipe the result to file, i.e. `awk 'FNR==NR {a[$0]++; next} !($0 in a)' file1.txt file2.txt > result.txt` – Asmus Nov 09 '20 at 13:23
  • 1
    Stuff that should work: Sort the data beforehan and/or index the data to be able to join it without doing a full table scan. Compressing the data is also a good idea (less io/memory operations) columnar storage is a great option. – Iñigo González Nov 10 '20 at 09:00
  • 2
    There is a way to speedup fastest StringIO version - 1) Use BytesIO and 'rb' read avoiding string decoding and operating raw bytes. Funny thing, that bytesio is faster than "filename.split() - that fact puts some questions. 2) Get rid of generator in "read_lines" - return just "handle" - you will get rid of extra generator calls. 3) Questionable measure, but you can get rid of "strip" methods as well - it will be ok for counting, because all strings (not just some) will have "/n" at the end – m9_psy Nov 11 '20 at 01:09
  • @m9_psy Thanks for the tips to improve performance. It's really faster! I updated my question, added a new version of the code. – Владимир Nov 11 '20 at 07:47
  • This project does exactly this https://github.com/nil0x42/duplicut – Bijan May 01 '21 at 22:48

4 Answers4

8

I have a suspicion that the join operation requires n * m comparison operations where n and m are the length of the two dataframes.

Also, there's an inconsistency between your description and your code:

  • "That is, you need a csv or txt file with lines that are in the second file and which are not in the first." ⟶ this means in dv_check but not in dv_base
  • dv_check.join(dv_base, on='data', how='inner', inplace=True) ⟶ this means in both dv_check and dv_base

Anyhow, an idea is to use set since checking for membership in a set has a time complexity of O(1) while checking membership in a list has a complexity of O(n). If you are familiar with the SQL world, this is equivalent to moving from a LOOP JOIN strategy to a HASH JOIN strategy:

# This will take care of removing the duplicates
base_set = set(dv_base['data'])
check_set = set(dv_check['data'])

# In `dv_check` but not `dv_base`
keys = check_set - base_set

# In both `dv_check` and `dv_base`
keys = check_set & base_set

This only gives you the keys that satisfy your condition. You still have to filter the two dataframes to get the other attributes.

Finished in 1 minute and 14 seconds on my 2014 iMac with 16GB of RAM.

Code Different
  • 90,614
  • 16
  • 144
  • 163
  • I have error: ```TypeError: list indices must be integers or slices, not str``` on row: ```base_set = set(dv_base['data'])```. Screen: https://prnt.sc/vc0quw – Владимир Nov 02 '20 at 17:23
  • Replaced ```base_set = set(dv_base['data'])``` with ```base_set = set(dv_base['data'].unique())``` and it worked. Is it possible to somehow limit the memory usage in my script? For example, for the case of checking files larger than 100 GB. – Владимир Nov 02 '20 at 18:17
  • You can load `dv_base` in small chunks (say, 10M rows) at a time. You still have not resolved the inconsistency I mentioned above – Code Different Nov 09 '20 at 12:53
6

Let's generate a dataset to mimic your example

import vaex
import numpy as np
N = 50_000_000  # 50 million rows for base
N2 = 1_000_000  # 1 million for check
M = 40+1+45     # chars for each string
N_dup = 10_000  # number of duplicate rows in the checks

s1 = np.random.randint(ord('a'), ord('z'), (N, M), np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'), ord('z'), (N2, M), np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2, N_dup, replace=False)
s2[dups] = s1[np.random.choice(N, N_dup, replace=False)]

# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')

Now, to find the rows in check, that are not in base, we can join them, and drop the rows that did not match:

import vaex
base = vaex.open('/data/tmp/base.hdf5')
check = vaex.open('/data/tmp/check.hdf5')
# joined contains rows where s_other is missing
joined = check.join(base, on='s', how='left', rsuffix='_other')
# drop those
unique = joined.dropmissing(['s_other'])
# and we have everything left
unique
#      s                                                    s_other
0      'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfh...  'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfhb...
1      'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikh...  'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikhh...
2      'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhact...  'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhactn...
3      'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjk...  'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjku...
4      'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvb...  'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvbs...
...    ...                                                  ...
9,995  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwn...  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwno...
9,996  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogms...  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogmsb...
9,997  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknji...  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknjii...
9,998  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmv...  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmvo...
9,999  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfeb...  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfebi...
Maarten Breddels
  • 1,344
  • 10
  • 12
4

Here is another approach. The check file is about 0.1 GB (fits in memory). The base file is up to 100 GB (so process a line at a time).

Create test data and generator function to import data

from io import StringIO

# test data for base (>50 million lines)
base_file = '''a
b
c
d
e
'''

# test data for check (1 million lines)
check_file = '''d
e
f
g
'''

def read_lines(filename):
    ''' Read data file one line at a time (with generator function).'''
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')

Find elements in the check file only (check_set - base_set in @CodeDifferent's example)

check_set = {elem for elem in read_lines(check_file)}

for elem in read_lines(base_file):
    if elem in check_set:
        check_set.remove(elem)
print(check_set)
{'g', 'f'}

Find intersection (check_set & base_set in @CodeDifferent's example)

check_set = {elem for elem in read_lines(check_file)}

common_elements = set()
for elem in read_lines(base_file):
    if elem in check_set:
        common_elements.add(elem)
print(common_elements)
{'d', 'e'}

I think this approach would work best when (a) base file is much larger than check file and (b) base file is too big for in-memory data structure.

jsmart
  • 2,921
  • 1
  • 6
  • 13
3

Note! My original answer is wrong. @codedifferent is right. Here is my slightly different version. This may help somebody. I assume that the text file only contains one column.

import pandas as pd

filepath_check = './data/checkfile.csv'
filepath_base = './data/basefile.csv'

# load the small data into memory
dfcheck = pd.read_csv(filepath_check)
dfcheck = set(dfcheck['data'])

# but load the big data in chunk
chunk_iter = pd.read_csv(filepath_base, chunksize=100000)

# for each chunk, remove intersect if any.
for chunk in chunk_iter:
    dfcheck = dfcheck - set(chunk['data'])
    print(len(dfcheck))

# write result
with open('./results.txt', 'w') as filehandler:
    for item in dfcheck:
        filehandler.write('%s\n'% item)

old answer

I faced a similar problem now. My solution is to use Dask, but sure, Vaex should be fine.

import dask.dataframe as dd

base_file = dd.read_csv('./base_file.csv')
check_file = dd.read_csv('./check_file.csv')

base_file = base_file.set_index('data')
check_file = check_file.set_index('data')

base_file.to_parquet('./results/base_file', compression=None)
check_file.to_parquet('./results/base_file', compression=None)

base_file.read_parquet('./results/base_file')
check_file.read_parquet('./results/check_file')
merged = dd.merge(base_file, check_file, left_index=True, right_index=True)

# save to csv from dask dataframe
merged.to_csv('/results/dask_result.csv', single_file = True)

# or save to csv from pandas dataframe
pandas_merged = merged.compute() # convert to pandas
pandas_merged.to_csv(...)
  1. Why set_index? It makes the Join process faster. https://docs.dask.org/en/latest/dataframe-best-practices.html#joins
  2. Why save to parquet just to read from it afterward? In my experience, even with Dask, directly read CSV takes more memory. The loading parquet file is definitely faster. https://docs.dask.org/en/latest/dataframe-best-practices.html#store-data-in-apache-parquet-format. I have many save/load lines to save the result after exhaustive processes such as join and set_index.
  3. if check_file is small enough, you can load the whole check_file into memory with check_file = check_file.persist() after loading the file or wherever necessary.
Sandi
  • 150
  • 1
  • 9
  • Your way keeps duplicate lines: `merged` stores the lines that are in both files. And I need unique lines: those that are in the second file, but which are not in the first file. `check_file - merged ` – Владимир Nov 10 '20 at 08:44
  • UPD: this work `merged = base_file.merge(check_file, how='outer', indicator=True).loc[lambda x: x['_merge'] == 'right_only']` – Владимир Nov 10 '20 at 09:07
  • Glad if something works for you. Pardon my wrong answer. But the @codedifferent answer is right and efficient. Is the text file only contains one column? – Sandi Nov 10 '20 at 09:29
  • Yes, contains one column. I am looking for any way to get unique strings from 2 text files quickly. It doesn't have to be pandas, dask or vaex. Do you know more ways? – Владимир Nov 10 '20 at 10:42
  • I updated my question to add the fastest (currently) way to compare two files. – Владимир Nov 10 '20 at 10:50
  • This is a good decision. But not fast enough. The method I added to my question (StringIO) is about twice as fast. – Владимир Nov 10 '20 at 11:25