- Aggregation >
- Map-Reduce >
- Perform Incremental Map-Reduce
Perform Incremental Map-Reduce¶
Map-reduce operations can handle complex aggregation tasks. To perform
map-reduce operations, MongoDB provides the mapReduce
command and, in the mongo
shell, the
db.collection.mapReduce()
wrapper method.
If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.
To perform incremental map-reduce:
- Run a map-reduce job over the current collection and output the result to a separate collection.
- When you have more data to process, run subsequent map-reduce job
with:
- the
query
parameter that specifies conditions that match only the new documents. - the
out
parameter that specifies thereduce
action to merge the new results into the existing output collection.
- the
Consider the following example where you schedule a map-reduce
operation on a sessions
collection to run at the end of each day.
Data Setup¶
The sessions
collection contains documents that log users’ sessions
each day, for example:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
Initial Map-Reduce of Current Collection¶
Run the first map-reduce operation as follows:
Define the map function that maps the
userid
to an object that contains the fieldsuserid
,total_time
,count
, andavg_time
:var mapFunction = function() { var key = this.userid; var value = { userid: this.userid, total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); };
Define the corresponding reduce function with two arguments
key
andvalues
to calculate the total time and the count. Thekey
corresponds to theuserid
, and thevalues
is an array whose elements corresponds to the individual objects mapped to theuserid
in themapFunction
.var reduceFunction = function(key, values) { var reducedObject = { userid: key, total_time: 0, count:0, avg_time:0 }; values.forEach( function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; } ); return reducedObject; };
Define the finalize function with two arguments
key
andreducedValue
. The function modifies thereducedValue
document to add another fieldaverage
and returns the modified document.var finalizeFunction = function (key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; };
Perform map-reduce on the
session
collection using themapFunction
, thereduceFunction
, and thefinalizeFunction
functions. Output the results to a collectionsession_stat
. If thesession_stat
collection already exists, the operation will replace the contents:db.sessions.mapReduce( mapFunction, reduceFunction, { out: "session_stat", finalize: finalizeFunction } )
Subsequent Incremental Map-Reduce¶
Later, as the sessions
collection grows, you can run additional
map-reduce operations. For example, add new documents to the
sessions
collection:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
At the end of the day, perform incremental map-reduce on the
sessions
collection, but use the query
field to select only the
new documents. Output the results to the collection session_stat
,
but reduce
the contents with the results of the incremental
map-reduce:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
out: { reduce: "session_stat" },
finalize: finalizeFunction
}
);