Created
August 30, 2010 15:32
-
-
Save skippy/557564 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
m/r overview: | |
map: break date/value into various time buckets (multiple emits)... retrieve values from collectionA | |
reduce: merge key and each item into key => array of values | |
finalize: search to see if stored calc already exists in collectionB... if so, use some of that meta-data. populate a hash, run calculations for each key, and then insert/upsert into collectionB | |
Failing test: | |
ruby acceptance test | |
6 threads: | |
- insert a bunch of records (right now one-by-one) | |
- run multiple map/reduce | |
- hangs | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
gem 'bson', '1.0.7' | |
gem 'bson_ext', '1.0.7' | |
gem 'mongo', '1.0.8' | |
require 'bson' | |
require 'mongo' | |
require 'benchmark' | |
# NOTES: on my machine (MacBook Pro, 2.66 GHz Intel Core i7 (dual-core hyperthreaded)), this works until I increase the # of threads to 3 | |
# then it locks...though in this example, mongostat doesn't show any locks being held, though currentOp() does | |
num_rows = 10_000 | |
num_threads = 1 | |
conn = Mongo::Connection.new 'localhost', nil, :pool_size => num_threads, :timeout => 10 | |
conn.drop_database('mr_failure_test_db') #start fresh! | |
db = conn.db('mr_failure_test_db') | |
raw_data_collection = db.collection('raw_data') | |
calculations_collection = db.collection('calculations') | |
# STEP 1: | |
# lets inserts a bunch of raw data. These are random integers with datetimes ranging from around Now back to ~ 6.5 days ago | |
puts "Step 1: Inserting rows" | |
num_rows.times do |i| | |
#insert some random values, with datetimes ranging from now back about 6.5 days | |
raw_data_collection.insert({'value' => rand(1000), 'datetime' => Time.now - rand(500_000)}, :safe => true) | |
puts " inserted #{(i/num_rows.to_f * 100).round}% of the rows" if i > 0 && i % (num_rows/10) == 0 | |
end | |
# STEP 2: | |
# Lets setup the map reduce | |
# map: emit a few time for various time buckets | |
# reduce: just merge the various time bucket values into one array | |
# finalize: run a simple calculation and insert that into another collection | |
# | |
# and run this over multiple threads | |
puts "STEP 2: running m/r against the collection over #{num_threads} threads" | |
sys_js_collection = db.collection('system.js') | |
value_cleanup = <<-EOS | |
function(val) { | |
return isNaN(val) ? null : val; | |
}; | |
EOS | |
sys_js_collection.save({:_id => 'value_cleanup', :value => BSON::Code.new(value_cleanup.strip)}, :safe => true) | |
test_map_func = <<-EOS | |
function(rawData) { | |
emit(rawData.datetime.getDay() + "_test_day", {values: [rawData.value]}) | |
}; | |
EOS | |
sys_js_collection.save({:_id => 'test_map_func', :value => BSON::Code.new(test_map_func.strip)}, :safe => true) | |
test_reduce_func = <<-EOS | |
function(key, values) { | |
var totalSize = 0; | |
for ( var i=0; i<values.length; i++ ){ | |
totalSize += values[i].values.length; | |
} | |
var results = {values: new Array(totalSize)}; | |
var loc = 0; | |
for ( var i=0; i<values.length; i++ ){ | |
for(var j=0; j<values[i].values.length; j++){ | |
results.values[loc++] = values[i].values[j]; | |
} | |
} | |
return results; | |
}; | |
EOS | |
sys_js_collection.save({:_id => 'test_reduce_func', :value => BSON::Code.new(test_reduce_func.strip)}, :safe => true) | |
test_finalize_func = <<-EOS | |
function(key, results){ | |
var num_results = results.values.length; | |
if(num_results == 0){ | |
return {}; | |
} | |
//this is for finding an individual calc | |
var calc = db.calculations.findOne({'key': key}) || {} | |
var sum = 0; | |
var n = 0; | |
for(var i=0; i < num_results; i++){ | |
var val = results.values[i]; | |
n +=1; | |
sum += val; | |
} | |
calc.mean = n > 0 ? value_cleanup(sum/n) : 0; | |
calc.n = value_cleanup(n); | |
calc.sum = value_cleanup(sum); | |
calc.key = key; | |
calc.processed_at = new Date(new Date().toUTCString()); | |
db.calculations.save(calc); | |
return calc; | |
} | |
EOS | |
sys_js_collection.save({:_id => 'test_finalize_func', :value => BSON::Code.new(test_finalize_func.strip)}, :safe => true) | |
map = BSON::Code.new("function(){return test_map_func(this)}") | |
reduce = BSON::Code.new("function(key,values){return test_reduce_func(key,values)}") | |
finalize = BSON::Code.new("function(key, results){return test_finalize_func(key, results)}") | |
mr_opts = { | |
:finalize => finalize, | |
:verbose => true, | |
:keeptemp => false | |
} | |
overall_runtime = Benchmark.realtime do | |
threads = [] | |
begin | |
threads = (0..(num_threads-1)).map do |i| | |
Thread.new do | |
puts "launching thread: #{i}" | |
raw_data_collection.map_reduce(map, reduce, mr_opts) | |
puts "finished with thread: #{i}" | |
end | |
end | |
ensure | |
threads.each { |t| t.join } | |
end | |
end | |
puts "M/R finished for #{num_threads} threads in #{overall_runtime.round} secs. Inserted #{calculations_collection.count} calculations" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// NOTE: some names changed below... they will be in all caps | |
MongoDB shell version: 1.6.1 | |
connecting to: test | |
> db.currentOp() | |
{ | |
"inprog" : [ | |
{ | |
"opid" : 127465, | |
"active" : false, | |
"lockType" : "read", | |
"waitingForLock" : true, | |
"op" : "query", | |
"ns" : "?MY_DB-test.COLLECTION_A", | |
"query" : { | |
"$query" : { | |
"META_DATA_HASH.OWNER_ID" : 7, | |
"relative_time" : { | |
"$gte" : "Sun Jan 31 2010 16:00:00 GMT-0800 (PST)", | |
"$lte" : "Sun Feb 28 2010 15:59:59 GMT-0800 (PST)" | |
} | |
}, | |
"$orderby" : { | |
"relative_time" : -1 | |
} | |
}, | |
"client" : "127.0.0.1:51340", | |
"desc" : "conn" | |
}, | |
{ | |
"opid" : 127451, | |
"active" : true, | |
"lockType" : "read", | |
"waitingForLock" : false, | |
"secs_running" : 232, | |
"op" : "query", | |
"ns" : "MY_DB-test.COLLECTION_A", | |
"query" : { | |
"$msg" : "query not recording (too large)" | |
}, | |
"client" : "127.0.0.1:51341", | |
"desc" : "conn", | |
"msg" : "m/r: (1/3) emit phase 300/319 94%" | |
}, | |
{ | |
"opid" : 127464, | |
"active" : false, | |
"lockType" : "read", | |
"waitingForLock" : true, | |
"op" : "query", | |
"ns" : "?MY_DB-test.COLLECTION_A", | |
"query" : { | |
"$query" : { | |
"META_DATA_HASH.OWNER_ID" : 9, | |
"relative_time" : { | |
"$gte" : "Sun Jan 31 2010 16:00:00 GMT-0800 (PST)", | |
"$lte" : "Sun Feb 28 2010 15:59:59 GMT-0800 (PST)" | |
} | |
}, | |
"$orderby" : { | |
"relative_time" : -1 | |
} | |
}, | |
"client" : "127.0.0.1:51305", | |
"desc" : "conn" | |
}, | |
{ | |
"opid" : 127463, | |
"active" : false, | |
"lockType" : "write", | |
"waitingForLock" : true, | |
"op" : "insert", | |
"ns" : "?", | |
"client" : "0.0.0.0:0", | |
"desc" : "conn" | |
}, | |
{ | |
"opid" : 127466, | |
"active" : false, | |
"lockType" : "read", | |
"waitingForLock" : true, | |
"op" : "query", | |
"ns" : "?MY_DB-test.COLLECTION_A", | |
"query" : { | |
"$query" : { | |
"starts_at" : { | |
"$gte" : "Tue Jan 19 2010 16:00:00 GMT-0800 (PST)" | |
}, | |
"META_DATA_HASH" : { | |
"OWNER_ID" : 8 | |
}, | |
"range" : "daily", | |
"n" : { | |
"$gt" : 0 | |
}, | |
"stops_at" : { | |
"$lte" : "Tue Apr 20 2010 17:00:00 GMT-0700 (PDT)" | |
} | |
}, | |
"$orderby" : { | |
"starts_at" : -1 | |
} | |
}, | |
"client" : "127.0.0.1:51342", | |
"desc" : "conn" | |
}, | |
{ | |
"opid" : 127467, | |
"active" : false, | |
"lockType" : "read", | |
"waitingForLock" : true, | |
"op" : "query", | |
"ns" : "MY_DB-test.COLLECTION_A", | |
"query" : { | |
"$query" : { | |
"META_DATA_HASH.OWNER_ID" : 6, | |
"relative_time" : { | |
"$gte" : "Sun Jan 31 2010 16:00:00 GMT-0800 (PST)", | |
"$lte" : "Sun Feb 28 2010 15:59:59 GMT-0800 (PST)" | |
} | |
}, | |
"$orderby" : { | |
"relative_time" : -1 | |
} | |
}, | |
"client" : "127.0.0.1:51343", | |
"desc" : "conn" | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
insert/s query/s update/s delete/s getmore/s command/s flushes/s mapped vsize res locked % idx miss % q t|r|w conn time | |
225 223 0 0 0 226 0 464 2887 33 10.6 0 0|0|0 7 08:22:16 | |
222 222 0 0 0 223 0 464 2887 33 6.7 0 0|0|0 7 08:22:17 | |
233 232 0 0 0 234 0 464 2887 34 6.9 0 0|0|0 7 08:22:18 | |
218 220 0 0 0 219 0 464 2888 34 9.3 0 0|0|0 7 08:22:19 | |
225 224 0 0 0 226 0 464 2888 34 6.5 0 0|0|0 7 08:22:20 | |
191 190 0 0 0 192 0 464 2888 34 5.4 0 0|0|0 7 08:22:21 | |
223 226 0 0 0 224 0 464 2888 35 6.5 0 0|0|0 7 08:22:22 | |
213 214 0 0 0 214 0 464 2888 35 10.9 0 0|0|0 7 08:22:23 | |
220 218 0 0 0 221 0 464 2888 35 10.3 0 0|0|0 7 08:22:24 | |
226 224 0 0 0 227 0 464 2888 35 6.7 0 0|0|0 7 08:22:25 | |
221 222 0 0 0 222 0 464 2888 35 8.2 0 0|0|0 7 08:22:26 | |
219 221 0 0 0 220 0 464 2888 36 8.7 0 0|0|0 7 08:22:27 | |
199 199 0 0 0 200 0 464 2888 36 9.8 0 0|0|0 7 08:22:28 | |
192 190 0 0 0 193 0 464 2888 36 5.8 0 0|0|0 7 08:22:29 | |
193 194 0 0 0 194 0 464 2888 36 8.4 0 0|0|0 7 08:22:30 | |
204 204 0 0 0 205 0 464 2888 36 11.9 0 0|0|0 7 08:22:31 | |
176 174 0 0 0 177 0 464 2888 37 14 0 0|0|0 7 08:22:32 | |
201 202 0 0 0 202 0 464 2888 37 10.4 0 0|0|0 7 08:22:33 | |
212 215 0 0 0 213 0 464 2888 37 12.3 0 0|0|0 7 08:22:34 | |
215 212 0 0 0 216 0 464 2888 37 8.4 0 0|0|0 7 08:22:35 | |
insert/s query/s update/s delete/s getmore/s command/s flushes/s mapped vsize res locked % idx miss % q t|r|w conn time | |
215 217 0 0 0 216 0 464 2888 37 9.7 0 0|0|0 7 08:22:36 | |
214 212 0 0 0 215 0 464 2888 38 10.9 0 0|0|0 7 08:22:37 | |
224 227 0 0 0 225 0 464 2888 38 9 0 0|0|0 7 08:22:38 | |
223 220 0 0 0 224 0 464 2888 38 9 0 0|0|0 7 08:22:39 | |
206 207 0 0 0 207 0 464 2888 38 7.9 0 0|0|0 7 08:22:40 | |
191 191 0 0 0 192 0 464 2888 38 8 0 0|0|0 7 08:22:41 | |
191 194 0 0 0 192 0 464 2888 39 13.5 0 5|3|2 7 08:22:42 | |
223 217 0 0 0 224 0 464 2888 39 8.4 0 0|0|0 7 08:22:43 | |
215 219 0 0 0 216 0 464 2888 39 8.5 0 0|0|0 7 08:22:44 | |
223 222 0 0 0 224 0 464 2888 39 8.4 0 0|0|0 7 08:22:45 | |
223 225 0 0 0 224 0 464 2888 40 6.8 0 1|0|1 7 08:22:46 | |
217 216 0 0 0 218 0 464 2888 40 9.2 0 0|0|0 7 08:22:47 | |
225 225 0 0 0 226 0 464 2888 40 6.5 0 0|0|0 7 08:22:48 | |
196 192 0 0 0 197 0 464 2888 40 7.5 0 0|0|0 7 08:22:49 | |
6 10 2 0 0 7 0 464 2888 40 0.2 0 0|0|0 7 08:22:50 | |
10 33 4 0 6 17 0 464 2889 45 0.9 0 0|0|0 7 08:22:51 | |
20 38 0 0 3 33 0 464 2890 48 1.7 0 0|0|0 7 08:22:52 | |
1 21 0 0 6 12 0 464 2891 48 0.3 0 5|4|1 7 08:22:53 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:54 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:55 | |
insert/s query/s update/s delete/s getmore/s command/s flushes/s mapped vsize res locked % idx miss % q t|r|w conn time | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:56 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:57 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:58 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:22:59 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:00 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:01 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:02 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:03 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:04 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:05 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:06 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:07 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:08 | |
0 0 0 0 0 1 0 464 2891 48 0 0 5|4|1 7 08:23:09 | |
0 0 0 0 0 1 0 464 2891 45 0 0 5|4|1 7 08:23:10 | |
0 0 0 0 0 1 1 464 2891 40 0 0 5|4|1 7 08:23:11 | |
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:12 | |
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:13 | |
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:14 | |
0 0 0 0 0 1 0 464 2891 40 0 0 5|4|1 7 08:23:15 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ps auwx | grep mongo | |
adam 34471 0.4 1.1 2964992 46188 ?? S 8:15AM 0:31.50 /usr/local/Cellar/mongodb/1.6.1-x86_64/bin/mongod run --config /usr/local/Cellar/mongodb/1.6.1-x86_64/mongod.conf | |
adam 34856 0.0 0.0 2435040 544 s000 S+ 8:31AM 0:00.00 grep mongo | |
adam 34845 0.0 0.1 623400 3716 s001 S+ 8:26AM 0:00.04 mongo | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ mongo --version | |
MongoDB shell version: 1.6.1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment