5

I have about 2 million records which have about 4 string fields each which needs to be checked for duplicates. To be more specific I have name, phone, address and fathername as fields and I must check for dedupe using all these fields with rest of data. The resulting unique records need to be noted into db.

I have been able to implement mapreduce, iterarate of all records. Task rate is set to 100/s and bucket-size to 100. Billing enabled.

Currently, everything is working, but performance is very very slow. I have been able to complete only 1000 records dedupe processing among a test dataset of 10,000 records in 6 hours.

The current design in java is:

  1. In every map iteration, I compare the current record with the previous record
  2. Previous record is a single record in db which acts like a global variable which I overwrite with another previous record in each map iteration
  3. Comparison is done using an algorithm and result is written as a new entity to db
  4. At the end of one Mapreduce job, i programatically create another job
  5. The previous record variable helps the job to compare with next candidate record with rest of the data

I am ready to increase any amount of GAE resources to achieve this in shortest time.

My Questions are:

  1. Will the accuracy of dedupe (checking for duplicates) affect due to parallel jobs/tasks?
  2. How can this design be improved?
  3. Will this scale to 20 million records
  4. Whats the fastest way to read/write variables (not just counters) during map iteration which can be used across one mapreduce job.

Freelancers most welcome to assist in this.

Thanks for your help.

charming30
  • 171
  • 10
  • 1
    More info: To dedupe 100 records, I must loop 100x100 times to be able to compare all records with rest of the records. So I am running 100x100 mapreduce jobs one after another programatically triggered at the end of each job. – charming30 Jul 21 '11 at 02:54
  • 1
    Do you really need GAE/mapreduce for this? It would not be hard to do this in memory very quickly in, e.g., Java. @jiggy's suggestion of using a HashSet approach would work fine. – jkraybill Jul 21 '11 at 03:28
  • @jkraybill: You are right, we have a java client which does this, we want to speed this using distributed computing. Really hoping to build a faster and scalable solution. – charming30 Jul 21 '11 at 03:53

5 Answers5

4

You should take advantage of the Reducer to do the equivalent of a sort -u for each field. You'll need to do one M/R job per field. You would make the field you are comparing the key in the mapper, then in the reducer you'd get all of the records with the same name grouped together and you could mark them. The second pass would be for the phone, etc. Depending on your cluster size each pass should be very fast.

Edit: @Olaf pointed out the OP probably wants totally unique records. Using a multipart key this could be a one-line hadoop streaming command to get the unique set. I'll add that soon.

Edit2: Promised streaming command that will perform a sort -u on the entire file. This assumes you have a file with the records with each field (name, fathername, phone number and address) one per line tab delimited in one or more files in the dir hdfs://example/dedup/input/. The actual hdfs path can be anything, or you could use a single file. The output will be multiple part-* files in hdfs://example/dedup/output/. You also might need to change the command as your hadoop-streaming.jar might be in a slightly different place. If you have more than 4 fields change the value of stream.num.map.output.key.fields.

   $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input hdfs://example/dedup/input/ -output hdfs://example/dedup/output/ \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /usr/bin/uniq \
-D stream.num.map.output.key.fields=4

To retrieve the unique results to a file in the local filesystem file run this:

    $HADOOP_HOME/bin/hadoop fs -cat \
 'hdfs://example/dedup/output/part-*' > results.txt

One note is that as every column is a key streaming will add a null value so each row will have an extra tab at the end. That is easily stripped off.

If you want to do more than just get the uniq output you could put your own java class or command line program rather than use /usr/bin/uniq. That class could, for example, update all records that you find are duplicated by adding a fifth column in your input that is the records DB ID. Hadoop by default partitions results by the whole key so each group of duplicate records will be streamed together a reducer, and this will all happen in parallel. Please take a look at the streaming documentation for more info.

cftarnas
  • 1,745
  • 10
  • 9
  • Why can one define a key consisting of all 4 fields in question? It's clearly the right way to do in a straight Hadoop MapReduce job. Is here some framework in play that would not allow that? – Olaf Jul 21 '11 at 17:54
  • You could -- and that was my first thought -- but after a second read of the question my take was that the OP was attempting to to detect duplicates in any of the fields, not just all (detect a duplicate phone number even if the name is different). – cftarnas Jul 22 '11 at 00:08
  • @cfranas: My understanding is that "name" is the first name and "fatername" is the last name, so records for John Carpenter and John Smith should not be deduped. – Olaf Jul 22 '11 at 01:03
  • @Olaf: After another read of the question I now agree with you. So using one pass and a multipart key would be what he needs. Should be quite fast. Edited above to reflect that. – cftarnas Jul 22 '11 at 01:13
  • @Olaf: Thanks for your valuable insights. In my case, all fields must considered for comparison and I am using coeff. dice algorithm to find matches. I am open to use anything outside GAE too, if thats simpler and faster. I am new to hadoop, I will surely consider your inputs. – charming30 Jul 22 '11 at 19:11
  • @cfranas: Thanks for your inputs, I will surely consider them. – charming30 Jul 22 '11 at 19:13
  • You'll want to write your own Comparator class and tell Hadoop to use that. – cftarnas Jul 22 '11 at 21:51
3

I see 2 ways to approach this problem:

  1. (If you only need to do it once) AppEngine creates a property index for every property in your entity (unless you ask it not to do that). Create a backend, run a query "SELECT * FROM ORDER BY " in batches using cursors, determine duplicated properties and fix/delete those. You might be able to parallelize this, but it's tricky on shard boundaries and you will probably have to write all the code yourself.

  2. You can use mapper framework to do it slower, but run in parallel. This approach also allows you to efficiently dedupe data on insert. Introduce a new entity to hold unique property values. Say "UniquePhoneNumber". The entity should hold a phone number as a key and a reference to the entity with this phone number. Now run a map and do a lookup for UniquePhoneNumber. If it's found and its reference is valid, delete the duplicate. If not create a new one with correct reference. This way it's even possible to repoint a reference to the other one, if you need to. Make sure that you read UniquePhoneNumber and create a new one/update a new one inside a single transaction. Otherwise duplicates won't be detected.

mikea
  • 805
  • 6
  • 17
  • Thanks. I completely agree with you. I originally started with tasks and later moved to mapper framework. I have redesigned the above solution to make it faster. I now run a mapper which creates n mappers, each of which will be given a unique record to compare with the rest of the records. I emit values to db as they are found and use callbacks of job to summarize the whole process. I am yet to test it at large scale. Thanks for your useful inputs. – charming30 Jul 22 '11 at 19:24
1

Generate a hash code for each record. Loop through your records and insert each one into a Set based on the hash code. The Set is now a deduped list in O(N).

jiggy
  • 3,828
  • 1
  • 25
  • 40
  • Only so long as there are no hash collisions, which would result in missing records. Depending on the design, and with ~2M records to process, collisions are possibly going to occur. – Mac Jul 21 '11 at 03:26
  • 1
    Yeah. The advice is still good, but obviously in case of a collision one has to check for equality still. But a good hash function should be able to differentiate the data quite nicely (a 64bit address space is quite large for only 2million records). – Voo Jul 21 '11 at 03:32
  • This would work if the mapreduce were run on a single machine in a single process, and the dataset fit into ram. It's not. – Nick Johnson Jul 21 '11 at 05:03
  • @Nick, I've never actually run an MR process, but my understanding is that you break the process into chunks. You can dedupe each bucket, then merge those buckets into a bigger bucket and dedupe that. So, not really O(N) in that case, but it would still work. – jiggy Jul 21 '11 at 14:56
  • @jiggy You could - but that's not what your answer describes. :) – Nick Johnson Jul 22 '11 at 00:03
1

You definitely shouldn't be using your current approach - only one process can update an entity at one time, so your entire mapreduce is bottlenecked on that one entity. Further, mapreduce doesn't currently allow you to specify the ordering of a result set, so you've got no guarantee you'll find all (or even most) duplicates.

For now, your best option is probably to build your own solution. Using cursors, perform a query on the kind sorted by the field you want to deduplicate, and scan over it, checking for duplicates and removing them (in batches, to reduce RPCs) as you encounter them. When you need to chain another task (due to the 10 minute task limit), use a cursor to ensure the new task picks up where you left off.

If you want to parallelize this, you can, by having each shard start by skipping over records until it discovers a change in the value you're deduplicating on, and starting from there. At the end of a shard, wait until you reach the end of a group before stopping. This way, you make sure you don't miss removing duplicates positioned on the edge of a shard boundary.

Nick Johnson
  • 100,655
  • 16
  • 128
  • 198
  • I'm not familiar with Google's app-engine but you can definitely specify the ordering of result sets with hadoop. – cftarnas Jul 22 '11 at 05:47
  • @nick: Your approach definitely sounds good, I will surely try it. One question, theoretically speaking if I can create ~2M mappers and run them simultaneously, where each mapper takes a unique record as input and compares with the rest, will GAE allow this? – charming30 Jul 22 '11 at 19:37
  • @cftarnas Hadoop's and App Engine's implementations of mapreduce are entirely separate. – Nick Johnson Jul 24 '11 at 01:58
  • @charming30 Obviously you won't be able to run 2 million mapper tasks simultaneously - App Engine will run as many as it can at once. You'd be better of breaking work into larger chunks, which each do many records, as mapreduce does. – Nick Johnson Jul 24 '11 at 01:58
  • @nick True - which was why I was clarifying that mapreduce inherently does not have that limitation (as he has hadoop as a tag as well). – cftarnas Jul 25 '11 at 18:02
0

Here is a solution based on hashed self join with Map Reduce. it can also do fuzzy duplicate matching with edit distance algorithm. You can choose the fields from the record that you want to use for duplicate detection. The reducer will output a duplicate score.

https://pkghosh.wordpress.com/2013/09/09/identifying-duplicate-records-with-fuzzy-matching/

Pranab
  • 663
  • 5
  • 10