5

I have a very large unsorted file, 1000GB, of ID pairs

  1. ID:ABC123 ID:ABC124
  2. ID:ABC123 ID:ABC124
  3. ID:ABC123 ID:ABA122
  4. ID:ABC124 ID:ABC123
  5. ID:ABC124 ID:ABC126

I would like to filter the file for

1) duplicates

example
ABC123 ABC124
ABC123 ABC124

2) reverse pairs (discard the second occurrence)

example
ABC123 ABC124
ABC124 ABC123

After filtering, the example file above would look like

  1. ID:ABC123 ID:ABC124
  2. ID:ABC123 ID:ABA122
  3. ID:ABC124 ID:ABC126

Currently, my solution is this

my %hash;

while(my $line = <FH>){
     chomp $line; #remove \n
     my ($id1,$id2) = split / /, $line;
     if(exists $hash{$id1$1d2} || exists $hash{$id2$id1}){
            next;
     }
     else{
         $hash{$id1$id2} = undef ; ## store it in a hash
         print "$line\n";
      }
}

which gives me the desired results for smaller lists, but takes up too much memory for larger lists, as I am storing the hash in memory.

I am looking for a solution that will take less memory to implement. Some thoughts I have are

1) save the hash to a file, instead of memory

2) multiple passes over the file

3) sorting and uniquing the file with unix sort -u -k1,2

After posting on stack exchange cs, they suggested an external sort algorithm

bio-boris
  • 126
  • 1
  • 7
  • If you switch the IDs of your first two lines, one of them will be duplicates with line 4, right? – Haifeng Zhang May 09 '14 at 22:26
  • Is this a one-off or a recurring task? If the latter, is there any chance to not add duplicates at the time of file creation? – Andrew Morton May 09 '14 at 22:27
  • How many lines are there or equivalently how long is the average line? What do the IDs look like? Like your example, 3 letters followed by 3 numbers? How many rows do you expect to remove? 10, 1000, 1% or 10%? How are they distributed? A few lines with many duplicates? Many lines with a few duplicates? – Daniel Brückner May 09 '14 at 22:28
  • @haifzhan If I switch the ids of the first two lines, lines 2 and 4 would be deleted. – bio-boris May 09 '14 at 22:42
  • @AndrewMorton This is a recurring task. It is useful to have the data in both with duplicates and with deduplication. – bio-boris May 09 '14 at 22:43
  • @DanielBrückner The line is ID followed by 6 alphanumeric characters. In some datasets, I expect to remove as much as 90%, but often I expect to remove 50% . They are distributed with many lines with many duplicates – bio-boris May 09 '14 at 22:43
  • Given the removal of reversed pairs, is the ordering of the IDs in the output significant at all? Are you just looking to see if two IDs are paired? So, if 1:ABC124 ABC123, is in the input (and survives) does it matter if 1:ABC123 ABC124 is the reflection of that line in the output file? – Trevor Tippins May 09 '14 at 23:06
  • @TrevorTippins The first instance of a pair is kept. From then on, all copies of the pairs are discarded, all reflections of the pair are discarded. – bio-boris May 09 '14 at 23:08
  • Are the letters all uppercase? Are letters and digits separated or can 7QF3AZ occur? – Daniel Brückner May 09 '14 at 23:10
  • @DanielBrückner All IDS are uppercase and alphanumeric – bio-boris May 09 '14 at 23:11
  • But no special structure like in your example - 3 letters first, then 3 digits? – Daniel Brückner May 09 '14 at 23:13
  • @DanielBrückner no special structure or order no – bio-boris May 09 '14 at 23:14

6 Answers6

3

You could use map reduce for the tasks.

Map-Reduce is a framework for batch-processing that allows you to easily distribute your work among several machines, and use parallel processing without taking care of synchronization and failure tolerance.

map(id1,id2):
    if id1<id2:
        yield(id1,id2)
   else:
        yield(id2,id1)

reduce(id1,list<ids>):
   ids = hashset(ids) //fairly small per id
   for each id2 in ids:
       yield(id1,id2)

The map-reduce implementation will allow you to distribute your work on several machines with really little extra programming work required.
This algorithm also requires linear (and fairly small) number of traversals over the data, with fairly small amount of extra memory needed, assuming each ID is associated with a small number of other IDs.

Note that this will alter the order of pairs (make first id second in some cases)
If the order of original ids does matter, you can pretty easily solve it with an extra field.
Also note that the order of data is altered, and there is no way to overcome it when using map-reduce.

For better efficiency, you might want to add a combiner, which will do the same job as the reducer in this case, but if it will actually help depends a lot on the data.

Hadoop is an open source library that implements Map-Reduce, and is widely used in the community.

amit
  • 175,853
  • 27
  • 231
  • 333
  • So what you are saying is **1) use more machines to get the memory I need ** 2) chunk the data and distribute it if the data is randomly spaced throughout the file, how do I know if what I am looking for is in the the chunk that I am searching? Each id can be associated with a very large number of IDS – bio-boris May 09 '14 at 22:27
  • Would the downvoter please comment? This answer gives real engineering solution when dealing with big-data, and 1TB is usually considered big data. – amit May 09 '14 at 22:31
  • can you provide more details, as I know MapReduce is a framwork, and Hadoop is for storage, how to process those 1000GB big data using MapReduce on Hadoop? – Haifeng Zhang May 09 '14 at 22:32
  • @user3574820 This is not what I am saying. The work is split to chunks and you process each and yield (id1,id2) according to the order, next - the reducer makes sure you process all the pairs that have the same first id on the same machine. The framework takes care of it for you. – amit May 09 '14 at 22:35
  • Not my downvote (it never is), but does map-reduce automatically handle the case where the data do not fit in the combined memory of the machines involved? – David Eisenstat May 09 '14 at 22:36
  • @haifzhan Hadoop is not only storage. It is also implementing Map-Reduce. I gave a pseudo code of the map and reduce stages, that can be easily implemented on hadoop or any other implementing library. For more details, look on the [org.apache.hadoop.mapreuce](http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/package-summary.html) package. – amit May 09 '14 at 22:37
  • @DavidEisenstat It is working on files, not memory, the only memory used in this algorithm is the set creation in the reducer, and the assumption that for each id: {(id,id') | for all existing pairs in DB} is fairly small. There are ways to go around it if it's not the case, but from my experience - it seldom is. – amit May 09 '14 at 22:38
  • Really, none of the comments actually provided reasoning to the downvote, all of them were just misunderstandings of the map-reduce framework. this is ridiculous :| – amit May 09 '14 at 22:44
  • @amit actually I played with HDFS and map reduce for several weeks to solve my data analysis problem, and I still dont know how to make them work properly, how many cluster nodes is fit to process 1000GB data, how to make the combination work efficiently, thanks – Haifeng Zhang May 09 '14 at 22:49
  • @haifzhan TBH, I don't know. It depends on how many nodes you actually have on your cluster, their IO/cpu/ram performance, and the actual data processed. best way is to benchmark a bunch of configurations and decide. I have little experience with the cluster administration itself, so I am afraid I cannot help you there. – amit May 09 '14 at 22:58
  • "ids = hashset(ids) //fairly small per id" -- how do you know this is true? If, for example, 50% of dataset have same `id1`, then 500Gb of data will be moved to a single node for reduction. If you want to use MR, you can just take the whole record (with reordered IDs) as a key - Hadoop will group them and send to appropriate host. E.g. `def map(k, record): return record; def reducer(k, records): return records[0]; combiner = reducer`. – ffriend May 10 '14 at 01:58
  • But even in this case Hadoop will do a lot of unnecessary work sorting records for grouping and sending them over network. If you want more flexible solution, try out [Spark](http://spark.apache.org/), which gives you control over all parts of distributed operations. – ffriend May 10 '14 at 01:59
  • @ffriend As I said, this is an assumption that usually holds true. If it is not the case, make (id1,id2) as the key, and the value will just be a 'junk' 1 or something. The thing is that the overhead for using the entire data as the key is bigger, since more reducers are created (not reducer nodes, more reducer on the same machines...), and this overhead might be expansive. However, if the assumption I mentioned does not hold, it should be done. – amit May 10 '14 at 07:25
3

Depending on the details of your data (see my comment on the question) a Bloom filter may be a simple way to get away with two passes. In the first pass insert every pair into the filter after ordering the first and the second value and generate a set of possible duplicates. In the second pass filter the file using the set of possible duplicates. This obviously requires that the set of (possible) duplicates is not itself large.

Given the characteristics of the data set - up to around 25 billion unique pairs and roughly 64 bit per pair - the result will be on the order of 200 GB. So you either need a lot of memory, many passes or many machines. Even a Bloom filter will have to be huge to yield a acceptable error rate.

sortbenchmark.org can provide some hints on what is required because the task is not to different from sorting. The 2011 winner used 66 nodes with 2 quadcore processors, 24 GiB memory and 16 500 GB disks each and sorted 1,353 GB in 59.2 seconds.

Daniel Brückner
  • 59,031
  • 16
  • 99
  • 143
  • processing 1TB of data on a single machine is going to be hazardous, especially since the comments suggest this is going to be done more than once (on several data sets). – amit May 09 '14 at 22:49
  • With a HDD you can read 1 TB in less than 2 hours, faster with a RAID system and even faster with SSDs. You can still scale to multiple machines by processing different files on different machines. – Daniel Brückner May 09 '14 at 22:59
3

As alternative to rolling your own clever solution, you could add the data into a database and then use SQL to get the subset that you need. Many great minds have already solved the problem of querying 'big data', and 1000GB is not really that big, all things considered...

Tom
  • 7,994
  • 8
  • 45
  • 62
  • Please, provide SQL query that solves the problem. Simply putting data into DB doesn't do it. – ffriend May 10 '14 at 01:41
  • I think that should be a new question, tagged appropriately. – Tom May 12 '14 at 00:51
  • This question is about removing duplicates from the dataset, and I don't see how your response answers this question. – ffriend May 12 '14 at 12:00
  • This question is about 'How to filter a very very large file' - just look at the title. If the OP wants help constructing the SQL query that will remove duplicates etc. then he should ask a new question, tagged with 'SQL', as he will get a lot more help this way. – Tom May 13 '14 at 22:34
  • OP wants to filter his file/dataset. You suggested using SQL, but didn't specify what kind of SQL/what query. It's like saying "to filter your file just use script". But what script? What should be _in_ this script? Just "script" is not an answer, neither "SQL" is. You can try to sort your data, or call "distinct", or use "group by" or whatever. All these solutions have different cost, and not all of them (if any) may really be applicable. "Use SQL" is just too generic answer to really solve the problem. – ffriend May 14 '14 at 07:27
  • Which is why I suggest asking a new question, once the OP accepts the wisdom of my approach. – Tom May 14 '14 at 12:48
  • So you don't propose solution, but only a tool, not directly related to the question. Okay. – ffriend May 14 '14 at 13:10
  • What I propose is an approach, that scales well. SQL, if the OP would add that tag and rephrase the question, I'm sure he would have the answer within minutes. Remove duplicates and reverse pairs, well lets remove the reverse pairs problem by ordering them in a predictable way, and then removing duplicates is definitely something that's been done before... – Tom May 19 '14 at 01:39
  • It doesn't scale well, this is the problem. Removing duplicates was never easy task for large datasets. In general, you need either put all the data into memory on a single machine (expensive in memory) or use external sorting (expensive in time). Both of these options are undesirable for large datasets, which is the whole point of the question. SQL has several related expressions, but none of them solve the problem by themselves. And if you think it out and find the way to remove duplicates in SQL, most likely you will see how much simpler it could be done by custom script without SQL. – ffriend May 19 '14 at 07:21
  • @ffriend : I'm with Tom on this one. These "custom script" would end up simply duplicating the hard work that has been done over many years in the field of databases. 1TB is a huge amount of data to process by "custom script" and I'd wager that only the most cunning of developers could figure out a way of doing this without immediately running out of memory. OTOH, sql does this s**t all the time. – spender May 19 '14 at 22:08
  • @spender: So, please, provide SQL statement that will remove duplicates from `some_table` in RDBMS of your choice. I'm pretty sure it's possible (and I can even think of some solutions), but so far there's no solution at all in this thread. – ffriend May 19 '14 at 23:06
  • I have my wrong thinking hat on ATM. Will think about this tomorrow. – spender May 19 '14 at 23:46
  • @ffriend I was trying to point the OP in the right direction without giving 'teh codez'... This question http://stackoverflow.com/questions/18418122/delete-duplicates-from-large-dataset-100mio-rows might also provide some inspiration. – Tom May 20 '14 at 12:05
  • But how do you know it's the right direction? Database won't magically solve all your problems - you still have to know how things work under the hood. Naive use of `distinct` or `group by` or whatever you like may result in sorting, which is terribly slow for large datasets. – ffriend May 20 '14 at 13:32
  • @ffriend : So I added an answer for you to peruse. – spender May 28 '14 at 03:19
1

Your approach is almost fine, you just need to move you hashes to disk instead of keeping them in memory. But let's go step by step.

Reorder IDs

It's inconvenient to work with records with different order of IDs in them. So, if possible, reorder IDs, or, if not, create additional keys for each record that holds correct order. I will assume you can reorder IDs (I'm not very good in Bash, so my code will be in Python):

with open('input.txt') as file_in, open('reordered.txt', 'w') as file_out:
    for line in file_in:
        reordered = ' '.join(sorted(line.split(' ')))  # reorder IDs
        file_out.write(reordered + '\n')

Group records by hash

You cannot filter all records at once, but you can split them into reasonable number of parts. Each part may be uniquely identified by hash of records in it, e.g.:

N_PARTS = 1000
with open('reordered.txt') as file_in:
    for line in file_in: 
        part_id = hash(line) % N_PARTS # part_id will be between 0 and (N_PARTS-1)
        with open('part-%8d.txt' % part_id, 'a') as part_file:
            part_file.write(line + '\n')

Choice of has function is important here. I used standard Python's hash() (module N_PARTS), but you may need to use another function, that gives distribution of number of records with each hash close to uniform. If hash function work more or less ok, instead of 1 large file of 1Tb you will get 1000 small files of ~100Mb. And the most important thing is that you have a guarantee that there are no 2 same records in different parts.

Note, that opening and closing part files for each line isn't really a good idea, since it generates countless system calls. In fact, better approach would be to keep files open (you may need to increase your ulimit -f), use batching or even write to database - this is up to implementation, while I will keep the code simple for the purpose of demonstration.

Filter each group

100Mb file are much easier to work with, aren't they? You can load them into memory and easily remove duplicates with hash set:

unique = set([])
for i in range(N_PARTS):                          # for each part
    with open('part-%8d.txt') as part_file: 
        file line in part_file:                   # for each line
            unique.add(line)
with open('output.txt', 'w') as file_out:
    for record in unique:
        file_out.write(record + '\n')

This approach uses some heavy I/O operations and 3 passes, but it is linear in time and uses configurable amount of memory (if your parts are still too large for a single machine, just increase N_PARTS).

ffriend
  • 27,562
  • 13
  • 91
  • 132
1

So if this were me I'd take the database route as described by @Tom in another answer. I'm using Transact SQL here, but it seems that most of the major SQL databases have similar windowing/ranking row_number() implementations (except MySQL).

I would probably run a two sweep approach, first rewriting the id1 and id2 columns into a new table so that the "lowest" value is in id1 and the highest in id2.

This means that the subsequent task is to find the dupes in this rewritten table.

Initially, you would need to bulk-copy your source data into the database, or generate a whole bunch of insert statements. I've gone for the insert here, but would favour a bulk insert for big data. Different databases have different means of doing the same thing.

CREATE TABLE #TestTable
(
    id int,
    id1 char(6) NOT NULL,
    id2 char(6) NOT NULL
)

insert into 
#TestTable (id, id1, id2) 
values 
    (1, 'ABC123', 'ABC124'),
    (2, 'ABC123', 'ABC124'),
    (3, 'ABC123', 'ABA122'),
    (4, 'ABC124', 'ABC123'),
    (5, 'ABC124', 'ABC126');

select 
    id, 
    (case when id1 <= id2 
        then id1 
        else id2 
    end) id1,
    (case when id1 <= id2 
        then id2 
        else id1 
    end) id2
    into #correctedTable 
from #TestTable

create index idx_id1_id2 on #correctedTable (id1, id2, id)

;with ranked as
(select 
    ROW_NUMBER() over (partition by id1, id2 order by id) dupeRank, 
    id,
    id1,
    id2
 from #correctedTable)

select id, id1, id2 
  from ranked where dupeRank = 1

drop table #correctedTable
drop table #TestTable

Which gives us the result:

3           ABA122 ABC123
1           ABC123 ABC124
5           ABC124 ABC126
spender
  • 117,338
  • 33
  • 229
  • 351
  • 1
    Thanks for solution in SQL, I faithfully upvote your answer. I don't know how `row_number...partition by` works, but I assume it uses bucket sort, which is quite reasonable. Also, you may notice that your solution is basically equivalent to mine - both reduce to grouping records in linear time. Some differences: you use (id1, id2) as grouping function, while I deal with more abstract hash function; you index your data for partitioning, while I partition records on-line when writing to files. The rest is almost the same, and to my knowledge this is the only efficient algorithm for the task. – ffriend May 28 '14 at 08:37
0

I'm no trying to answer the question, merely adding my 0.02€ to other answers.

A must-to-do to me is to split the task into multiple smaller tasks as was already suggested. Both the control flow and the data structures.

The way that Merge Sort was used with Tape Drives to sort big data volumes (larger than memory and larger then random-access-disk). In nowaday terms it would mean that the storage is distributed accross multiple (networked) disks or networked disk-sectors.

There are already languages and even operating systems that support this kind of distribution with different granularity. Some 10 years ago I had my hot candidates for this kind of tasks but I don't remember the names and things had changed since then.

On of the first was the distributed Linda Operating System with parallel processors attached/disconnected as needed. Basic coordination structure was huge distributed Tuple Space data structure where processors read/wrote tasks and wrote results.

More recent approach with similar distribution of work are the Multi agent systems (Czech Wikipedia article perhaps contains more links)

Related wikipedia article are Parallel Computing, Supercomputer Operating Systems and List of concurrent and parallel programming languages

I don't mean to say that you should buy some processor time on a supercomputer and run the computation there. I'm listing them as algorithmic concepts to study.

As there will be many times some free or open source software solutions available that will allow you to do the same in the small. Starting with cheap software and available hardware. e.g. back at university in 1990 we used the night time in computer lab to calculate ray-traced 3D images. It was very computationally expensive process as for every pixel you must cast a "ray" and calculate its collisions with the scene model. On 1 machine with scene with some glasses and mirrors it ran like 1 pixel per second (C++ and optimized assembly language code). At the lab we had some ~15 PCs available. So the final time might be reduced ~15 times (I386, I486 and image of 320x200 256 colors). The image was split into standalone tasks, computed in parallel and them merged into one. The approach scaled well at that time and similar approach would help you also today.

There always was and always will be something like "big data", that big that it does not fit into RAM and it does not fit on disk and it can't be computed on 1 computer in finite time.

Such tasks were solved successfully since the very first days of computing. Terms like B-Tree, Tape drive, Seek time, Fortran, Cobol, IBM AS/400 come from that era. If you're like engineers of those times than you'll for sure come out with something smart :)

EDIT: actually, you are probably looking for External Sorting

xmojmr
  • 8,073
  • 5
  • 31
  • 54