0

I have multiple time series in database in mongodb, with fields "ticker", "time", and "close" amongst other fields:

> db.bbticks.find().limit(2)
{ "_id" : ObjectId("522b2cf7d4236309a57c8f96"), "close" : 1.9432, "high" : 1.9433, "low" : 1.9426, "open" : 1.9427, "source" : "HIST", "systime" : ISODate("2013-09-07T13:41:13.383Z"), "ticker" : "USDTRY Curncy", "time" : ISODate("2013-08-01T15:14:00Z"), "type" : "BAR", "value" : 1.9432 }
{ "_id" : ObjectId("522b2cf7d4236309a57c8f97"), "close" : 1.9425, "high" : 1.9433, "low" : 1.9425, "open" : 1.9432, "source" : "HIST", "systime" : ISODate("2013-09-07T13:41:13.383Z"), "ticker" : "USDTRY Curncy", "time" : ISODate("2013-08-01T15:15:00Z"), "type" : "BAR", "value" : 1.9425 }

The time stamps are whole minutes. There are multiple timezones represented amongst the tickers, so for example, the MEXBOL Mexical stock market is open only from 13h30 UTC, whereas the FTSEMIB Italian stock market is open from 07h00 UTC. I want to bring down all the time series but only for timestamps that they all have. Here is an example:

> db.bbticks.find({ticker: "FTSEMIB Index", type: "BAR", time: {$gte: ISODate("2013-08-01")}}, {_id: 0, ticker: 1, time: 1, close: 1}).sort({time: 1}).limit(5)
{ "close" : 16565.04, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T07:00:00Z") }
{ "close" : 16585.56, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T07:01:00Z") }
{ "close" : 16583.29, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T07:02:00Z") }
{ "close" : 16578.95, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T07:03:00Z") }
{ "close" : 16587.16, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T07:04:00Z") }
> db.bbticks.find({ticker: "MEXBOL Index", type: "BAR", time: {$gte: ISODate("2013-08-01")}}, {_id: 0, ticker: 1, time: 1, close: 1}).sort({time: 1}).limit(5)
{ "close" : 41101.39, "ticker" : "MEXBOL Index", "time" : ISODate("2013-08-01T13:30:00Z") }
{ "close" : 41099.25, "ticker" : "MEXBOL Index", "time" : ISODate("2013-08-01T13:31:00Z") }
{ "close" : 41126.17, "ticker" : "MEXBOL Index", "time" : ISODate("2013-08-01T13:32:00Z") }
{ "close" : 41137.03, "ticker" : "MEXBOL Index", "time" : ISODate("2013-08-01T13:33:00Z") }
{ "close" : 41173.89, "ticker" : "MEXBOL Index", "time" : ISODate("2013-08-01T13:34:00Z") }

as you can see, for ticks on or after 1 August 2013, FTSEMIB starts at 07h00 and MEXBOL starts at 13h30. Data does exist for FTSEMIB after 13h30 too:

> db.bbticks.find({ticker: "FTSEMIB Index", type: "BAR", time: {$gte: ISODate("2013-08-01T13:30:00")}}, {_id: 0, ticker: 1, time: 1, close: 1}).sort({time: 1}).limit(5)
{ "close" : 16739.41, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T13:30:00Z") }
{ "close" : 16748.21, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T13:31:00Z") }
{ "close" : 16750.76, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T13:32:00Z") }
{ "close" : 16747.89, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T13:33:00Z") }
{ "close" : 16746.66, "ticker" : "FTSEMIB Index", "time" : ISODate("2013-08-01T13:34:00Z") }

So basically, wherever there is "time" field that exists for both tickers, I want only those closes returned. There may be multiple time series in the query (not just two), and there may be missing values within otherwise contiguous blocks of series (so for example, at 14h31 on 1 August for example, one series might not have value for that time, in which case no series must be returned for that time).

Basically, I want to compare time series, I need the series returned only for timestamps that they all have.

Finally, ideally I would prefer to use the aggregation pipeline framework, rather than Map Reduce, if possible.

Thomas Browne
  • 23,824
  • 32
  • 78
  • 121

1 Answers1

1

See if the following is in line with what you want to accomplish:

db.bbticks.aggregate(
[
 { $match: { time: { $gte: ISODate("2013-08-01") } } },
 { $group: { _id: "$time", count: {$sum: 1}, tickers: { $push: { "ticker": "$ticker" , "close": "$close" } } } } ,
 { $match: { count: { $gt: 1 } } }
]
)

-- break --

For the map-reduce, you could try the following (not very elegant, I think there are better ways but just some ideas to get you started). Also, as this will be a time series that grows, chances are you might want to use an incremental map-reduce (http://docs.mongodb.org/manual/tutorial/perform-incremental-map-reduce/). But the below can give you some ideas (like I said, it is ugly --- and it might be better to perform a second map-reduce operation rather than my last find statement, but up to you).

var mapFunction = function() {
                      var key = this.time

                      var value = { tickers: [
                                                { ticker: this.ticker, close: this.close } 
                                             ] };

                      emit( key, value );
                  };

var reduceFunction = function(keyObject, valuesArray) {
                     var reducedValue = { tickers: [] };

                     for (var idx = 0; idx < valuesArray.length; idx++) {
                        reducedValue.tickers.push( valuesArray[idx].tickers[0] )
                     }

                     return reducedValue;
                  };


db.bbticks.mapReduce( mapFunction,
                      reduceFunction,
                      {
                        out: "mr_interim_results",
                        sort: { time: 1 },
                        query: {
                                 time: {$gte: ISODate("2013-08-01") }
                               },
                      }
                   )

db.mr_interim_results.find( { 'value.tickers': { $not: { $size: 1 } } }  )
Kay
  • 2,942
  • 18
  • 13
  • "errmsg": "aggregation result exceeds maximum document size". Admittedly there are actually 115 series in bbticks, but since 1 August is not so long so even if I were to reduce the series number, I could be in a situation of requiring much longer history than 1 Aug so it must be robust to the 16mb doc size. Perhaps if you wouldn't mind running me through the logic of your aggregation, I might be able to play with it a bit..... – Thomas Browne Sep 09 '13 at 09:46
  • I think I may have to go with MapReduce because of this 16mb limitation: http://stackoverflow.com/questions/12337319/mongodb-aggregation-comparison-group-group-and-mapreduce. – Thomas Browne Sep 09 '13 at 10:10
  • Hi Thomas - as you stated, because of the size limit, you probably should go with MapReduce. In regards to the logic of the aggregation, I first do a $match to find all the documents that match the condition of time >= "2013-08-01". Then, these matching documents are grouped by time, keeping a count and adding a field "tickers" which contain an array of tickers & closing price associated with that time. Then, from these grouping, I do another match to return only those that have count > 1. – Kay Sep 09 '13 at 16:51
  • Hi Thomas - the development version 2.5.2 of MongoDB supports $out for aggregation which allows for output to a collection and is not under the 16mb limit. This version, which is for testing and not production purposes, may or may not work for you. In any case, the code would be db.bbticks.aggregate([ { $match: { time: { $gte: ISODate("2013-08-01") } } }, { $group: { _id: "$time", count: {$sum: 1}, tickers: { $push: { "ticker": "$ticker" , "close": "$close" } } } } , { $match: { count: { $gt: 1 } } }, { $out: "myAggResults"} ]) – Kay Sep 09 '13 at 17:21
  • Kay - I will download 2.5.2 but if you were able to point me in the right direction on MapReduce (I am a javascript / mapreduce / mongo newbie) that would be really helpful. Reason is I need this functionality to go on multiple machines so don't really want to rely on development algorithms unless I have to. That said, have tested your aggregation query with a smaller dataset and it seems to work well. – Thomas Browne Sep 10 '13 at 13:01
  • Hi Thomas -- I edited my answer to include a map-reduce (it's ugly but it was just a quick one off the top of my head, sorry). – Kay Sep 10 '13 at 15:36
  • Went with 2.5.2 in the end, and it works well, but I am getting Exception "errmsg" : "exception: Exceeded memory limit for $group, but didn't allow external sort" "code" : 16945. Wondering if, without taking too much time, you know if there is another thing I can add to sort out that problem. – Thomas Browne Sep 12 '13 at 16:17
  • Hey Thomas -- if you don't already, you might want the index {"time" : 1, "ticker" : 1, "close" : 1 } ? You might want to try the index before trying other development features of the 2.5.2. – Kay Sep 13 '13 at 03:51
  • Still getting the same error, Kay, but it works well for smaller size queries. I will play around with your two answers until I have something that works on the (very large) datasets that I have, at which time I will post detail about how I got there. Many thanks for your help. – Thomas Browne Sep 13 '13 at 06:25