1

I wish to merge multiple files with a single (f1.txt) file based on 2 column matches after comparison with that file. I can do it in pandas but it reads everything to memory which can get big really fast. I am thinking a line by line reading will not load everything into memory. Pandas is also not an option now. How do I perform the operation while filling in null for cells where a match with f1.txt does not occur?

Here, I used a dictionary, which I am not sure if it will hold in memory and I also can't find a way to add null where there is no match in the other files with f1.txt. The other files could be as many as 1000 different files. The time does not matter as long as I do not read everything to memory

FILES (tab-delimited)

f1.txt
A B  num  val scol
1 a1 1000 2 3
2 a2 456 7 2
3 a3 23 2 7
4 a4 800 7 3
5 a5 10 8 7

a1.txt
A B num val scol fcol dcol
1 a1 1000 2 3 0.2 0.77
2 a2 456 7 2 0.3 0.4
3 a3 23 2 7 0.5 0.6
4 a4 800 7 3 0.003 0.088

a2.txt
A B num val scol fcol2 dcol1
2 a2 456 7 2 0.7 0.8
4 a4 800 7 3 0.9 0.01
5 a5 10 8 7 0.03 0.07

Current Code

import os
import csv
m1 = os.getcwd() + '/f1.txt'
files_to_compare = [i for i in os.listdir('dir')]
dictionary = dict()
dictionary1 = dict()
with open(m1, 'rt') as a:
    reader1 = csv.reader(a, delimiter='\t')
    for x in files_to_compare:
        with open(os.getcwd() + '/dir/' + x, 'rt') as b:
            reader2 = csv.reader(b, delimiter='\t')
            for row1 in list(reader1):              
                dictionary[row1[0]] = list()
                dictionary1[row1[0]] = list(row1)
            for row2 in list(reader2):
                try:
                    dictionary[row2[0]].append(row2[5:])
                except KeyError:
                    pass
print(dictionary)
print(dictionary1)

What I am trying to achieve is similar to using: df.merge(df1, on=['A','B'], how='left').fillna('null')

current result
{'A': [['fcol1', 'dcol1'], ['fcol', 'dcol']], '1': [['0.2', '0.77']], '2': [['0.7', '0.8'], ['0.3', '0.4']], '3': [['0.5', '0.6']], '4': [['0.9', '0.01'], ['0.003', '0.088']], '5': [['0.03', '0.07']]}

{'A': ['A', 'B', 'num', 'val', 'scol'], '1': ['1', 'a1', '1000', '2', '3'], '2': ['2', 'a2', '456', '7', '2'], '3': ['3', 'a3', '23', '2', '7'], '4': ['4', 'a4', '800', '7', '3'], '5': ['5', 'a5', '10', '8', '7']}
Desired result
{'A': [['fcol1', 'dcol1'], ['fcol', 'dcol']], '1': [['0.2', '0.77'],['null', 'null']], '2': [['0.7', '0.8'], ['0.3', '0.4']], '3': [['0.5', '0.6'],['null', 'null']], '4': [['0.9', '0.01'], ['0.003', '0.088']], '5': [['null', 'null'],['0.03', '0.07']]}

{'A': ['A', 'B', 'num', 'val', 'scol'], '1': ['1', 'a1', '1000', '2', '3'], '2': ['2', 'a2', '456', '7', '2'], '3': ['3', 'a3', '23', '2', '7'], '4': ['4', 'a4', '800', '7', '3'], '5': ['5', 'a5', '10', '8', '7']}

My final intent is to write the dictionary to a text file. I do not know how much memory will be used or if it will even fit in memory. if there is a better way without using pandas, that will be nice else how do I make dictionary work?

DASK ATTEMPT:

import dask.dataframe as dd    
directory = 'input_dir/'
first_file = dd.read_csv('f1.txt', sep='\t')
df = dd.read_csv(directory + '*.txt', sep='\t')
df2 = dd.merge(first_file, df, on=[A, B])

I kept getting ValueError: Metadata mismatch found in 'from_delayed' 
+-----------+--------------------+
| column    |  Found  | Expected |
+--------------------------------+
| fcol      |  int64  | float64  |
+-----------+--------------------+

I googled, found similar complaints but could not fix it. That was why I decided to try this. Checked my files and all dtypes seem to be consistent. My version of dask was 2.9.1

Starter
  • 417
  • 4
  • 12
  • 1
    Does this answer your question? [Best way to join two large datasets in Pandas](https://stackoverflow.com/questions/37756991/best-way-to-join-two-large-datasets-in-pandas) – Juan Estevez Jan 02 '20 at 12:37
  • Thanks but it doesn't answer my question. I tried that answer before which is why I am reverting to using dictionaries to see if it will help. Remember I have multiple files that I wish to merge with the first file. Dask is also giving me some issues and I do not intend to go the mysql route nor increase ram – Starter Jan 02 '20 at 15:11
  • It might be worth including your attempt using dask. – de1 Jan 02 '20 at 16:28
  • @de1 I added my dask try and the error it generated – Starter Jan 03 '20 at 19:45

1 Answers1

1

If you want hand made solution, you can look at heapq.merge and itertools.groupby. This assumes your files are sorted by the first two columns (the key).

I made simple example that merges and groups the files and produces two files, instead of dictionaries (so (almost) nothing is stored in memory, everything is reading/writing from/to disk):

from heapq import merge
from itertools import groupby

first_file_name = 'f1.txt'
other_files = ['a1.txt', 'a2.txt']

def get_lines(filename):
    with open(filename, 'r') as f_in:
        for line in f_in:
            yield [filename, *line.strip().split()]

def get_values(lines):
    for line in lines:
        yield line
    while True:
        yield ['null']

opened_files = [get_lines(f) for f in [first_file_name] + other_files]

# save headers
headers = [next(f) for f in opened_files]

with open('out1.txt', 'w') as out1, open('out2.txt', 'w') as out2:
    # print headers to files
    print(*headers[0][1:6], sep='\t', file=out1)

    new_header = []
    for h in headers[1:]:
        new_header.extend(h[6:])

    print(*(['ID'] + new_header), sep='\t', file=out2)

    for v, g in groupby(merge(*opened_files, key=lambda k: (k[1], k[2])), lambda k: (k[1], k[2])):
        lines = [*g]

        print(*lines[0][1:6], sep='\t', file=out1)

        out_line = [lines[0][1]]
        iter_lines = get_values(lines[1:])
        current_line = next(iter_lines)
        for current_file in other_files:
            if current_line[0] == current_file:
                out_line.extend(current_line[6:])
                current_line = next(iter_lines)
            else:
                out_line.extend(['null', 'null'])
        print(*out_line, sep='\t', file=out2)

Produces two files:

out1.txt:

A   B   num val scol
1   a1  1000    2   3
2   a2  456 7   2
3   a3  23  2   7
4   a4  800 7   3
5   a5  10  8   7

out2.txt:

ID  fcol    dcol    fcol2   dcol1
1   0.2 0.77    null    null
2   0.3 0.4 0.7 0.8
3   0.5 0.6 null    null
4   0.003   0.088   0.9 0.01
5   null    null    0.03    0.07
Andrej Kesely
  • 168,389
  • 15
  • 48
  • 91
  • This is the only answer that has gotten me close but I need some clarifications. The no. of columns in the files can change (20, 30 10, columns), so I wonder how yield ['nulll'] *8 affects the result. So when I increased the no. of columns, the nulls for some columns are not written even though it is clear that the particular key, in the first file, does not exist in the file been compared. Also, the header for each column is required, which was why I had the dictionary containing the first key as column headers. How do I go about improving these concerns? – Starter Jan 03 '20 at 16:35
  • @Starter I edited my answer. Now the output files are with headers. Regarding the `yield ['null'] * 8` it's enough to yield only one value (I edited that too). Note: If you could do this with `dask` then better - I don't know very specifics of your input data. – Andrej Kesely Jan 03 '20 at 20:16