6

From How to get the pivot lines from two tab-separated files?, there's a quick way to use unix command to pivot lines from two files.

If we have two pairs of files:

  • f1a and f1b
  • f2a and f2b

The goal is to provide a 3 column tab-separated file, that comprises:

  • f1a / f2a
  • f1b
  • f2b

Where f1a / f2a are lines in the files that both occurs in f1a and f1b:

I've tried the following which works but if the file is extremely large, it will take significant amount of memory to store the f1 and f2 dictionary. E.g. files with billions of lines.

import sys
from tqdm import tqdm 

f1a, f1b, f2a, f2b = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]


# Read first pair of file into memory.
with open(f1a) as fin_f1a, open(f1a) as fin_f1b:
  f1 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f1a, fin_f1b))}

with open(s2) as fin_f2a, open(t2) as fin_f2b:
  f2 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f2a, fin_f2b))}


with open('pivoted.tsv', 'w') as fout:
  for s in tqdm(f1.keys() & f2.keys()):
    print('\t'.join([s, f1[s], f2[s]]), end='\n', file=fout)

Is there a faster/better/easier way to achieve the same 3-columns tab-separated file in Python? Are there libraries that can do such operations efficiently for huge files?


Using turicreate.SFrame, I could also do:

from turicreate import SFrame

f1a, f1b, f2a, f2b = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]

sf1a = SFrame.read_csv(f1a, delimited='\0', header=False)
sf1b = SFrame.read_csv(f1b, delimited='\0', header=False) 

sf2a = SFrame.read_csv(f2a, delimited='\0', header=False)
sf2b = SFrame.read_csv(f2b, delimited='\0', header=False)

sf1 = sf1a.join(sf1b) 
sf2 = sf2a.join(sf2b)

sf = sf1.join(sf2, on='X1', how='left') 
sf.save('pivoted')
Jérôme Richard
  • 41,678
  • 6
  • 29
  • 59
alvas
  • 115,346
  • 109
  • 446
  • 738

4 Answers4

5

Generic merge

The zip function will not store a whole copy of the iterables. So we can use it safely.

Assuming you have two iterables thatare sorted in ascending order by the first column you can join the two tables as follows.

def merge(t1, t2):
    end = object()
    end_ = end, None
    a1, b1 = next(t1, end_)
    a2, b2 = next(t2, end_)
    while a1 is not end and a2 is not end:
        if a1 < a2:
            a1, b1 = next(t1, end_)
        elif a1 > a2:
            a2, b2 = next(t2, end_)
        else:
            yield a1, b1, b2
            a1, b1 = next(t1, end_)
            a2, b2 = next(t2, end_)

Merge is invoked with two iteratos and produce a third iterator and only one element of each iterator needs to be stored at a time.

list(merge(iter([(0, 1), (1, 1), (3, 2)]), 
  iter([(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')])))
[(0, 1, 'a'), (1, 1, 'b'), (3, 2, 'd')]

Scaning and writing

In order to prevent the whole file from being stored I have the scan method that will read and yield one line at a time of each file.

def scan(fa, fb):
    for a, b in zip(fa, fb):
        a = a.strip().replace('\t', ' ')
        b = b.strip().replace('\t', ' ')
        yield a, b
def scan_by_name(fa, fb):
    with open(fa) as fha, open(fb) as fhb:
        yield from scan(fha, fhb)

Then you could apply to your problem this way (untested, I don't have your files)

with open('pivoted.tsv', 'w') as fout:
    t1 = scan_by_name(f1a, f1b)
    t2 = scan_by_name(f2a, f2b)
    for row in merge(t1, t2):
        print('\t'.join(row), end='\n', file=fout)
Bob
  • 13,867
  • 1
  • 5
  • 27
  • tqdm and zip wont load the file. It's read through and yield the items – alvas Mar 14 '21 at 10:21
  • You may be right. But in that case the package can only say how many iterations were performed not actual progress in percentage. – Bob Mar 14 '21 at 10:34
1

As leangaurav suggest, it can be done with Dask.

The advantages are that we can make a solution that runs in a pool of threads (or processes) and that reads the files chunks (using little RAM) without having to worry about this.

For example, we create some test data:

from string import ascii_lowercase
from random import choices

def rand_str(k=3):
    return " ".join("".join(choices(ascii_lowercase, k=k)) for _ in range(2))

N = 2_000

for file_name in ["example_a.txt", "example_b.txt"]:
    with open(file_name, "w") as f:
        for _ in range(N):
            line = f"{rand_str()} \t {rand_str()}\n"
            f.write(line)

And we read the data with dask, we indicate which column is going to be the index and we do a merge:

from dask import compute
import dask.dataframe as dd

# this does not process anything yet 
df_a = dd.read_csv("example_a.txt", sep="\t", names=["pivot", "data"]).set_index("pivot")
df_b = dd.read_csv("example_b.txt", sep="\t", names=["pivot", "data"]).set_index("pivot")

# this is the heavy part
result = dask.compute(dd.merge(df_a, df_b, left_index=True, right_index=True))
# save the output
result[0].to_csv("out.txt", sep="\t", header=False)

Some tests on an old notebook for different N (only taking into account the step of the compute):

  • N = 500_000 -> 11s
  • N = 1_000_000 -> 25s
  • N = 2_000_000 -> 44s
  • N = 4_000_000 -> 1m33s
Lucas
  • 6,869
  • 5
  • 29
  • 44
0

Dict comprehensions will be evaluated immediately.

If you don't like streaming,try segment your data.

def oneSegment(first_letter)
# Read first pair of file into memory.
with open(f1a) as fin_f1a, open(f1a) as fin_f1b:
f1 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f1a, fin_f1b)) if s.strip().replace('\t', ' ').startwith(first_letter)}

with open(s2) as fin_f2a, open(t2) as fin_f2b:
f2 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f2a, fin_f2b)) if s.strip().replace('\t', ' ').startwith(first_letter)}

with open('pivoted.tsv', 'a') as fout:
for s in tqdm(f1.keys() & f2.keys()):
    print('\t'.join([s, f1[s], f2[s]]), end='\n', file=fout)
oneSegment("a")
obgnaw
  • 3,007
  • 11
  • 25
0

This solution works assuming you are running your python code on linux, you can do this by running a sequence of linux commands via os.system. Some of the steps can be combined into a single command or can be run in parallel as separate processes to improve speed.
Each task in same set can be run in parallel. (1,2,3,4), (5,6), (7,8), (9,10,11)

import os

f1a = "f1a"
f1b = "f1b"
f2a = "f2a"
f2b = "f2b"

# replace \t with space
os.system(f"sed -i 's/\t/ /g' {f1a}") #1
os.system(f"sed -i 's/\t/ /g' {f1b}") #2
os.system(f"sed -i 's/\t/ /g' {f2a}") #3
os.system(f"sed -i 's/\t/ /g' {f2b}") #4

# join the pair of files with \t
os.system(f"paste {f1a} {f1b} > f1_t") #5 join f1a and f1b with \t delimiter
os.system(f"paste {f2a} {f2b} > f2_t") #6 join f1a and f1b with \t delimiter

# prepare data for join: sort the files
os.system(f"sort f1_t > f1") #7 
os.system(f"sort f2_t > f2") #8 

os.system(f"join f1 f2 -j 1 -t '\t'> f12") #9 lines common to both f1, f2: -j 1
os.system(f"join f1 f2 -v 1 -j 1 -t '\t'> f11") #10 lines present only in f1 : -v 1
os.system(f"join f1 f2 -v 2 -j 1 -t '\t'> f22") #11 lines present only in f2 : -v 2

os.system(f"cat f12  f11 f22 > f") #12 join into final result f

Since the question mentions python and not the platform, for a platform neutral approach look at dask.

This answer solves a part of the problem, which is combining large files. But joining the sets of files etc. still needs to be figured out, which should be possible with Dask as well. Or the pre-processing can be done via the above code(or even outside of code) and the merge can be done with Dask. Also check this answer.

While working on large data chunks, sorting makes things better mostly.
See details about indexes in this dask doc.

leangaurav
  • 393
  • 2
  • 11