Skip to content

Instantly share code, notes, and snippets.

@skippy
Created August 30, 2010 15:32
Show Gist options
  • Save skippy/557564 to your computer and use it in GitHub Desktop.
Save skippy/557564 to your computer and use it in GitHub Desktop.
m/r overview:
map: break date/value into various time buckets (multiple emits)... retrieve values from collectionA
reduce: merge key and each item into key => array of values
finalize: search to see if stored calc already exists in collectionB... if so, use some of that meta-data. populate a hash, run calculations for each key, and then insert/upsert into collectionB
Failing test:
ruby acceptance test
6 threads:
- insert a bunch of records (right now one-by-one)
- run multiple map/reduce
- hangs
require 'rubygems'
gem 'bson', '1.0.7'
gem 'bson_ext', '1.0.7'
gem 'mongo', '1.0.8'
require 'bson'
require 'mongo'
require 'benchmark'
# NOTES: on my machine (MacBook Pro, 2.66 GHz Intel Core i7 (dual-core hyperthreaded)), this works until I increase the # of threads to 3
# then it locks...though in this example, mongostat doesn't show any locks being held, though currentOp() does
num_rows = 10_000
num_threads = 1
conn = Mongo::Connection.new 'localhost', nil, :pool_size => num_threads, :timeout => 10
conn.drop_database('mr_failure_test_db') #start fresh!
db = conn.db('mr_failure_test_db')
raw_data_collection = db.collection('raw_data')
calculations_collection = db.collection('calculations')
# STEP 1:
# lets inserts a bunch of raw data. These are random integers with datetimes ranging from around Now back to ~ 6.5 days ago
puts "Step 1: Inserting rows"
num_rows.times do |i|
#insert some random values, with datetimes ranging from now back about 6.5 days
raw_data_collection.insert({'value' => rand(1000), 'datetime' => Time.now - rand(500_000)}, :safe => true)
puts " inserted #{(i/num_rows.to_f * 100).round}% of the rows" if i > 0 && i % (num_rows/10) == 0
end
# STEP 2:
# Lets setup the map reduce
# map: emit a few time for various time buckets
# reduce: just merge the various time bucket values into one array
# finalize: run a simple calculation and insert that into another collection
#
# and run this over multiple threads
puts "STEP 2: running m/r against the collection over #{num_threads} threads"
sys_js_collection = db.collection('system.js')
value_cleanup = <<-EOS
function(val) {
return isNaN(val) ? null : val;
};
EOS
sys_js_collection.save({:_id => 'value_cleanup', :value => BSON::Code.new(value_cleanup.strip)}, :safe => true)
test_map_func = <<-EOS
function(rawData) {
emit(rawData.datetime.getDay() + "_test_day", {values: [rawData.value]})
};
EOS
sys_js_collection.save({:_id => 'test_map_func', :value => BSON::Code.new(test_map_func.strip)}, :safe => true)
test_reduce_func = <<-EOS
function(key, values) {
var totalSize = 0;
for ( var i=0; i<values.length; i++ ){
totalSize += values[i].values.length;
}
var results = {values: new Array(totalSize)};
var loc = 0;
for ( var i=0; i<values.length; i++ ){
for(var j=0; j<values[i].values.length; j++){
results.values[loc++] = values[i].values[j];
}
}
return results;
};
EOS
sys_js_collection.save({:_id => 'test_reduce_func', :value => BSON::Code.new(test_reduce_func.strip)}, :safe => true)
test_finalize_func = <<-EOS
function(key, results){
var num_results = results.values.length;
if(num_results == 0){
return {};
}
//this is for finding an individual calc
var calc = db.calculations.findOne({'key': key}) || {}
var sum = 0;
var n = 0;
for(var i=0; i < num_results; i++){
var val = results.values[i];
n +=1;
sum += val;
}
calc.mean = n > 0 ? value_cleanup(sum/n) : 0;
calc.n = value_cleanup(n);
calc.sum = value_cleanup(sum);
calc.key = key;
calc.processed_at = new Date(new Date().toUTCString());
db.calculations.save(calc);
return calc;
}
EOS
sys_js_collection.save({:_id => 'test_finalize_func', :value => BSON::Code.new(test_finalize_func.strip)}, :safe => true)
map = BSON::Code.new("function(){return test_map_func(this)}")
reduce = BSON::Code.new("function(key,values){return test_reduce_func(key,values)}")
finalize = BSON::Code.new("function(key, results){return test_finalize_func(key, results)}")
mr_opts = {
:finalize => finalize,
:verbose => true,
:keeptemp => false
}
overall_runtime = Benchmark.realtime do
threads = []
begin
threads = (0..(num_threads-1)).map do |i|
Thread.new do
puts "launching thread: #{i}"
raw_data_collection.map_reduce(map, reduce, mr_opts)
puts "finished with thread: #{i}"
end
end
ensure
threads.each { |t| t.join }
end
end
puts "M/R finished for #{num_threads} threads in #{overall_runtime.round} secs. Inserted #{calculations_collection.count} calculations"
// NOTE: some names changed below... they will be in all caps
MongoDB shell version: 1.6.1
connecting to: test
> db.currentOp()
{
"inprog" : [
{
"opid" : 127465,
"active" : false,
"lockType" : "read",
"waitingForLock" : true,
"op" : "query",
"ns" : "?MY_DB-test.COLLECTION_A",
"query" : {
"$query" : {
"META_DATA_HASH.OWNER_ID" : 7,
"relative_time" : {
"$gte" : "Sun Jan 31 2010 16:00:00 GMT-0800 (PST)",
"$lte" : "Sun Feb 28 2010 15:59:59 GMT-0800 (PST)"
}
},
"$orderby" : {
"relative_time" : -1
}
},
"client" : "127.0.0.1:51340",
"desc" : "conn"
},
{
"opid" : 127451,
"active" : true,
"lockType" : "read",
"waitingForLock" : false,
"secs_running" : 232,
"op" : "query",
"ns" : "MY_DB-test.COLLECTION_A",
"query" : {
"$msg" : "query not recording (too large)"
},
"client" : "127.0.0.1:51341",
"desc" : "conn",
"msg" : "m/r: (1/3) emit phase 300/319 94%"
},
{
"opid" : 127464,
"active" : false,
"lockType" : "read",
"waitingForLock" : true,
"op" : "query",
"ns" : "?MY_DB-test.COLLECTION_A",
"query" : {
"$query" : {
"META_DATA_HASH.OWNER_ID" : 9,
"relative_time" : {
"$gte" : "Sun Jan 31 2010 16:00:00 GMT-0800 (PST)",
"$lte" : "Sun Feb 28 2010 15:59:59 GMT-0800 (PST)"
}
},
"$orderby" : {
"relative_time" : -1
}
},
"client" : "127.0.0.1:51305",
"desc" : "conn"
},
{
"opid" : 127463,
"active" : false,
"lockType" : "write",
"waitingForLock" : true,
"op" : "insert",
"ns" : "?",
"client" : "0.0.0.0:0",
"desc" : "conn"
},
{
"opid" : 127466,
"active" : false,
"lockType" : "read",
"waitingForLock" : true,
"op" : "query",
"ns" : "?MY_DB-test.COLLECTION_A",
"query" : {
"$query" : {
"starts_at" : {
"$gte" : "Tue Jan 19 2010 16:00:00 GMT-0800 (PST)"
},
"META_DATA_HASH" : {
"OWNER_ID" : 8
},
"range" : "daily",
"n" : {
"$gt" : 0
},
"stops_at" : {
"$lte" : "Tue Apr 20 2010 17:00:00 GMT-0700 (PDT)"
}
},
"$orderby" : {
"starts_at" : -1
}
},
"client" : "127.0.0.1:51342",
"desc" : "conn"
},
{
"opid" : 127467,
"active" : false,
"lockType" : "read",
"waitingForLock" : true,
"op" : "query",
"ns" : "MY_DB-test.COLLECTION_A",
"query" : {
"$query" : {
"META_DATA_HASH.OWNER_ID" : 6,
"relative_time" : {
"$gte" : "Sun Jan 31 2010 16:00:00 GMT-0800 (PST)",
"$lte" : "Sun Feb 28 2010 15:59:59 GMT-0800 (PST)"
}
},
"$orderby" : {
"relative_time" : -1
}
},
"client" : "127.0.0.1:51343",
"desc" : "conn"
}
]
}
insert/s query/s update/s delete/s getmore/s command/s flushes/s mapped vsize res locked % idx miss % q t|r|w conn time
225 223 0 0 0 226 0 464 2887 33 10.6 0 0|0|0 7 08:22:16
222 222 0 0 0 223 0 464 2887 33 6.7 0 0|0|0 7 08:22:17
233 232 0 0 0 234 0 464 2887 34 6.9 0 0|0|0 7 08:22:18
218 220 0 0 0 219 0 464 2888 34 9.3 0 0|0|0 7 08:22:19
225 224 0 0 0 226 0 464 2888 34 6.5 0 0|0|0 7 08:22:20
191 190 0 0 0 192 0 464 2888 34 5.4 0 0|0|0 7 08:22:21
223 226 0 0 0 224 0 464 2888 35 6.5 0 0|0|0 7 08:22:22
213 214 0 0 0 214 0 464 2888 35 10.9 0 0|0|0 7 08:22:23
220 218 0 0 0 221 0 464 2888 35 10.3 0 0|0|0 7 08:22:24
226 224 0 0 0 227 0 464 2888 35 6.7 0 0|0|0 7 08:22:25
221 222 0 0 0 222 0 464 2888 35 8.2 0 0|0|0 7 08:22:26
219 221 0 0 0 220 0 464 2888 36 8.7 0 0|0|0 7 08:22:27
199 199 0 0 0 200 0 464 2888 36 9.8 0 0|0|0 7 08:22:28
192 190 0 0 0 193 0 464 2888 36 5.8 0 0|0|0 7 08:22:29
193 194 0 0 0 194 0 464 2888 36 8.4 0 0|0|0 7 08:22:30
204 204 0 0 0 205 0 464 2888 36 11.9 0 0|0|0 7 08:22:31
176 174 0 0 0 177 0 464 2888 37 14 0 0|0|0 7 08:22:32
201 202 0 0 0 202 0 464 2888 37 10.4 0 0|0|0 7 08:22:33
212 215 0 0 0 213 0 464 2888 37 12.3 0 0|0|0 7 08:22:34
215 212 0 0 0 216 0 464 2888 37 8.4 0 0|0|0 7 08:22:35
insert/s query/s update/s delete/s getmore/s command/s flushes/s mapped vsize res locked % idx miss % q t|r|w conn time
215 217 0 0 0 216 0 464 2888 37 9.7 0 0|0|0 7 08:22:36
214 212 0 0 0 215 0 464 2888 38 10.9 0 0|0|0 7 08:22:37
224 227 0 0 0 225 0 464 2888 38 9 0 0|0|0 7 08:22:38
223 220 0 0 0 224 0 464 2888 38 9 0 0|0|0 7 08:22:39
206 207 0 0 0 207 0 464 2888 38 7.9 0 0|0|0 7 08:22:40
191 191 0 0 0 192 0 464 2888 38 8 0 0|0|0 7 08:22:41
191 194 0 0 0 192 0 464 2888 39 13.5 0 5|3|2 7 08:22:42
223 217 0 0 0 224 0 464 2888 39 8.4 0 0|0|0 7 08:22:43
215 219 0 0 0 216 0 464 2888 39 8.5 0 0|0|0 7 08:22:44
223 222 0 0 0 224 0 464 2888 39 8.4 0 0|0|0 7 08:22:45
223 225 0 0 0 224 0 464 2888 40 6.8 0 1|0|1 7 08:22:46
217 216 0 0 0 218 0 464 2888 40 9.2 0 0|0|0 7 08:22:47
225 225 0 0 0 226 0 464 2888 40 6.5 0 0|0|0 7 08:22:48
196 192 0 0 0 197 0 464 2888 40 7.5 0 0|0|0 7 08:22:49
6 10 2 0 0 7 0 464 2888 40 0.2 0 0|0|0 7 08:22:50
10 33 4 0 6 17 0 464 2889 45 0.9 0 0|0|0 7 08:22:51
20 38 0 0 3 33 0 464 2890 48 1.7 0 0|0|0 7 08:22:52
1 21 0 0 6 12 0 464 2891 48 0.3 0 5|4|1 7 08:22:53
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:54
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:55
insert/s query/s update/s delete/s getmore/s command/s flushes/s mapped vsize res locked % idx miss % q t|r|w conn time
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:56
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:57
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:58
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:59
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:00
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:01
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:02
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:03
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:04
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:05
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:06
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:07
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:08
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:09
0 0 0 0 0 1 0 464 2891 45 0 0 5|4|1 7 08:23:10
0 0 0 0 0 1 1 464 2891 40 0 0 5|4|1 7 08:23:11
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:12
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:13
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:14
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:15
$ ps auwx | grep mongo
adam 34471 0.4 1.1 2964992 46188 ?? S 8:15AM 0:31.50 /usr/local/Cellar/mongodb/1.6.1-x86_64/bin/mongod run --config /usr/local/Cellar/mongodb/1.6.1-x86_64/mongod.conf
adam 34856 0.0 0.0 2435040 544 s000 S+ 8:31AM 0:00.00 grep mongo
adam 34845 0.0 0.1 623400 3716 s001 S+ 8:26AM 0:00.04 mongo
$ mongo --version
MongoDB shell version: 1.6.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment