42

I have a long history with relational databases, but I'm new to MongoDB and MapReduce, so I'm almost positive I must be doing something wrong. I'll jump right into the question. Sorry if it's long.

I have a database table in MySQL that tracks the number of member profile views for each day. For testing it has 10,000,000 rows.

CREATE TABLE `profile_views` (
  `id` int(10) unsigned NOT NULL auto_increment,
  `username` varchar(20) NOT NULL,
  `day` date NOT NULL,
  `views` int(10) unsigned default '0',
  PRIMARY KEY  (`id`),
  UNIQUE KEY `username` (`username`,`day`),
  KEY `day` (`day`)
) ENGINE=InnoDB;

Typical data might look like this.

+--------+----------+------------+------+
| id     | username | day        | hits |
+--------+----------+------------+------+
| 650001 | Joe      | 2010-07-10 |    1 |
| 650002 | Jane     | 2010-07-10 |    2 |
| 650003 | Jack     | 2010-07-10 |    3 |
| 650004 | Jerry    | 2010-07-10 |    4 |
+--------+----------+------------+------+

I use this query to get the top 5 most viewed profiles since 2010-07-16.

SELECT username, SUM(hits)
FROM profile_views
WHERE day > '2010-07-16'
GROUP BY username
ORDER BY hits DESC
LIMIT 5\G

This query completes in under a minute. Not bad!

Now moving onto the world of MongoDB. I setup a sharded environment using 3 servers. Servers M, S1, and S2. I used the following commands to set the rig up (Note: I've obscured the IP addys).

S1 => 127.20.90.1
./mongod --fork --shardsvr --port 10000 --dbpath=/data/db --logpath=/data/log

S2 => 127.20.90.7
./mongod --fork --shardsvr --port 10000 --dbpath=/data/db --logpath=/data/log

M => 127.20.4.1
./mongod --fork --configsvr --dbpath=/data/db --logpath=/data/log
./mongos --fork --configdb 127.20.4.1 --chunkSize 1 --logpath=/data/slog

Once those were up and running, I hopped on server M, and launched mongo. I issued the following commands:

use admin
db.runCommand( { addshard : "127.20.90.1:10000", name: "M1" } );
db.runCommand( { addshard : "127.20.90.7:10000", name: "M2" } );
db.runCommand( { enablesharding : "profiles" } );
db.runCommand( { shardcollection : "profiles.views", key : {day : 1} } );
use profiles
db.views.ensureIndex({ hits: -1 });

I then imported the same 10,000,000 rows from MySQL, which gave me documents that look like this:

{
    "_id" : ObjectId("4cb8fc285582125055295600"),
    "username" : "Joe",
    "day" : "Fri May 21 2010 00:00:00 GMT-0400 (EDT)",
    "hits" : 16
}

Now comes the real meat and potatoes here... My map and reduce functions. Back on server M in the shell I setup the query and execute it like this.

use profiles;
var start = new Date(2010, 7, 16);
var map = function() {
    emit(this.username, this.hits);
}
var reduce = function(key, values) {
    var sum = 0;
    for(var i in values) sum += values[i];
    return sum;
}
res = db.views.mapReduce(
    map,
    reduce,
    {
        query : { day: { $gt: start }}
    }
);

And here's were I run into problems. This query took over 15 minutes to complete! The MySQL query took under a minute. Here's the output:

{
        "result" : "tmp.mr.mapreduce_1287207199_6",
        "shardCounts" : {
                "127.20.90.7:10000" : {
                        "input" : 4917653,
                        "emit" : 4917653,
                        "output" : 1105648
                },
                "127.20.90.1:10000" : {
                        "input" : 5082347,
                        "emit" : 5082347,
                        "output" : 1150547
                }
        },
        "counts" : {
                "emit" : NumberLong(10000000),
                "input" : NumberLong(10000000),
                "output" : NumberLong(2256195)
        },
        "ok" : 1,
        "timeMillis" : 811207,
        "timing" : {
                "shards" : 651467,
                "final" : 159740
        },
}

Not only did it take forever to run, but the results don't even seem to be correct.

db[res.result].find().sort({ hits: -1 }).limit(5);
{ "_id" : "Joe", "value" : 128 }
{ "_id" : "Jane", "value" : 2 }
{ "_id" : "Jerry", "value" : 2 }
{ "_id" : "Jack", "value" : 2 }
{ "_id" : "Jessy", "value" : 3 }

I know those value numbers should be much higher.

My understanding of the whole MapReduce paradigm is the task of performing this query should be split between all shard members, which should increase performance. I waited till Mongo was done distributing the documents between the two shard servers after the import. Each had almost exactly 5,000,000 documents when I started this query.

So I must be doing something wrong. Can anyone give me any pointers?

Edit: Someone on IRC mentioned adding an index on the day field, but as far as I can tell that was done automatically by MongoDB.

Community
  • 1
  • 1
mellowsoon
  • 22,273
  • 19
  • 57
  • 75
  • Gah.. Just realized one reason why the results are incorrect. I should have been sorting on "value" rather than "hits". – mellowsoon Oct 16 '10 at 06:15
  • 2
    One problem is that when you import your data into Mongo, the 'day' value is a giant string, but in mysql, it is a date (integer). When you put your data into mongo, make sure to store it as a Date type. – Clint Apr 13 '11 at 22:39
  • you might also separate date and time field, and store the date as string "20110101" or integer 20110101 and index based on date – Bahadir Cambel Sep 29 '11 at 18:15

4 Answers4

53

excerpts from MongoDB Definitive Guide from O'Reilly:

The price of using MapReduce is speed: group is not particularly speedy, but MapReduce is slower and is not supposed to be used in “real time.” You run MapReduce as a background job, it creates a collection of results, and then you can query that collection in real time.

options for map/reduce:

"keeptemp" : boolean 
If the temporary result collection should be saved when the connection is closed. 

"output" : string 
Name for the output collection. Setting this option implies keeptemp : true. 
nonopolarity
  • 146,324
  • 131
  • 460
  • 740
  • 9
    I think I misunderstood the purpose of MapReduce. I thought it was used to process a large amount of data faster than alternatives. I think I see now that it's more about the ability to process **huge** amounts of data that would otherwise be impossible to process on a single machine, and speed isn't a factor. – mellowsoon Oct 17 '10 at 03:22
  • 6
    @mellowsoon, of course the purpose of mapreduce is to process a large or huge amount of data fast. It is just MongoDB's implementation that isn't very fast. – TTT Oct 19 '10 at 21:00
  • @TTT - Thank you! Right now I'm thinking mongodb is still the right choice for the type of data we're trying to save, but it looks like I might have to use some other mapreduce technologies to actually crunch the data. – mellowsoon Oct 19 '10 at 21:39
  • 1
    Hadoop is perfect for this; if you don't like their Java interface, you could write map/reduce in other programming languages using Hadoop streaming. Hadoop is as parallelizable/scalable as it comes, and you can make it "faster" by adding more hardware. – Suman Apr 13 '12 at 17:45
  • 4
    The MapReduce implementation in MongoDB has little to do with map reduce apparently. Because for all I read, it is single-threaded, while map-reduce is meant to be used highly parallel on a cluster. – Has QUIT--Anony-Mousse Jun 16 '12 at 09:26
  • @Anony-Mousse, it is indeed single threaded but you can shard your data to multiple machines. I've read that some people also have multilple shards on the same machine. That way your MapReduce becomes multi threaded. I haven't tried this myself. – TTT Jul 25 '12 at 12:51
  • That still is just a nasty hack. MapReduce performance doesn't come from having a "map" and a "reduce" function, but from having a clever data distribution and load balancing system to run on, that allows you to fully utilize your computational power. When using sharding, chances are that your computation takes ages on one shard and has nothing to do on the other. – Has QUIT--Anony-Mousse Jul 25 '12 at 14:55
  • 1
    I think the parameter should be named "out", not "output", according to http://docs.mongodb.org/manual/applications/map-reduce/. – Marian Jan 26 '13 at 23:25
  • 1
    @Marian - as of 1.8 you're correct, and the keeptemp option is gone. Heads up to future readers. See here for details: [Release Notes for MongoDB 1.8](http://docs.mongodb.org/manual/release-notes/1.8/) – Kavi Siegel Oct 28 '14 at 03:21
29

Maybe I'm too late, but...

First, you are querying the collection to fill the MapReduce without an index. You shoud create an index on "day".

MongoDB MapReduce is single threaded on a single server, but parallelizes on shards. The data in mongo shards are kept together in contiguous chunks sorted by sharding key.

As your sharding key is "day", and you are querying on it, you probably are only using one of your three servers. Sharding key is only used to spread the data. Map Reduce will query using the "day" index on each shard, and will be very fast.

Add something in front of the day key to spread the data. The username can be a good choice.

That way the Map reduce will be launched on all servers and hopefully reducing the time by three.

Something like this:

use admin
db.runCommand( { addshard : "127.20.90.1:10000", name: "M1" } );
db.runCommand( { addshard : "127.20.90.7:10000", name: "M2" } );
db.runCommand( { enablesharding : "profiles" } );
db.runCommand( { shardcollection : "profiles.views", key : {username : 1,day: 1} } );
use profiles
db.views.ensureIndex({ hits: -1 });
db.views.ensureIndex({ day: -1 });

I think with those additions, you can match MySQL speed, even faster.

Also, better don't use it real time. If your data don't need to be "minutely" precise, shedule a map reduce task every now an then and use the result collection.

FrameGrace
  • 585
  • 4
  • 4
  • 1
    Also, one last thing to point is that MongoDB asks you to make sure your indexes can be kept in memory; running db.views.stats() tells you the index size. This is what helps you optimize and maximize performance. – Krynble Jul 01 '13 at 16:43
6

You are not doing anything wrong. (Besides sorting on the wrong value as you already noticed in your comments.)

MongoDB map/reduce performance just isn't that great. This is a known issue; see for example http://jira.mongodb.org/browse/SERVER-1197 where a naive approach is ~350x faster than M/R.

One advantage though is that you can specify a permanent output collection name with the out argument of the mapReduce call. Once the M/R is completed the temporary collection will be renamed to the permanent name atomically. That way you can schedule your statistics updates and query the M/R output collection real-time.

J-B
  • 400
  • 2
  • 8
  • Thanks for the response. I'm going to leave the question unanswered for just a bit longer to see if anyone else has some input. This is really disappointing though. I wonder where the bottle neck is? Perhaps because MongoDB is single threaded, so the server coordinating all the shards can only go so fast? I'm also curious about the results. It appears all 10 million docs where mapped, when most should have been excluded by the query. – mellowsoon Oct 16 '10 at 18:02
  • @mellowsoon:Verify your query by doing a count on the collection with the same arguments (and remember that the month for a JS Date object is zero-based indexed). – J-B Oct 16 '10 at 18:33
  • Thanks, I'm doing that now. I've done a complete fresh install of Mongo on the 3 servers, and I'm importing the data now. Once that's done, I'll look at how the data is distributed between the shards, and pick a date range that should put half the matching docs on each shard. – mellowsoon Oct 16 '10 at 19:12
  • Just wanted to add a P.S.: WTF on months starting on zero?! – mellowsoon Oct 16 '10 at 19:27
0

Have you already tried using hadoop connector for mongodb?

Look at this link here: http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

Since you are using only 3 shards, I don't know whether this approach would improve your case.