Created
August 14, 2012 22:57
-
-
Save mpobrien/3353732 to your computer and use it in GitHub Desktop.
hadoop-43
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From c23186a9dd96ba5dee1b526beab5c91344928836 Mon Sep 17 00:00:00 2001 | |
From: Mike O'Brien <[email protected]> | |
Date: Tue, 14 Aug 2012 18:44:52 -0400 | |
Subject: [PATCH] hack to allow direct linking of mongouris | |
--- | |
build.sbt | 2 +- | |
.../main/java/com/mongodb/hadoop/MongoConfig.java | 5 ++ | |
.../com/mongodb/hadoop/util/MongoConfigUtil.java | 15 ++++- | |
.../com/mongodb/hadoop/util/MongoSplitter.java | 73 +++++++++++++------- | |
.../examples/treasury/TreasuryYieldXMLConfig.java | 3 + | |
.../src/main/resources/mongo-treasury_yield.xml | 26 +++++++- | |
6 files changed, 95 insertions(+), 29 deletions(-) | |
diff --git a/build.sbt b/build.sbt | |
index 186420b..19dbed3 100644 | |
--- a/build.sbt | |
+++ b/build.sbt | |
@@ -6,4 +6,4 @@ organization := "org.mongodb" | |
seq(net.virtualvoid.sbt.graph.Plugin.graphSettings: _*) | |
-hadoopRelease in ThisBuild := "default" | |
+hadoopRelease in ThisBuild := "cdh3" | |
diff --git a/core/src/main/java/com/mongodb/hadoop/MongoConfig.java b/core/src/main/java/com/mongodb/hadoop/MongoConfig.java | |
index 38825c4..2dabdaa 100644 | |
--- a/core/src/main/java/com/mongodb/hadoop/MongoConfig.java | |
+++ b/core/src/main/java/com/mongodb/hadoop/MongoConfig.java | |
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapreduce.*; | |
import java.io.*; | |
+import java.util.Collection; | |
// Hadoop | |
// Commons | |
@@ -276,6 +277,10 @@ public class MongoConfig { | |
MongoConfigUtil.setSplitSize( _conf, value ); | |
} | |
+ public Collection<String> getShardURIs(){ | |
+ return MongoConfigUtil.getShardURIs( _conf ); | |
+ } | |
+ | |
/** | |
* if TRUE, | |
* Splits will be read by connecting to the individual shard servers, | |
diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java | |
index 0369574..d3c99e1 100644 | |
--- a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java | |
+++ b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java | |
@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapreduce.*; | |
+import java.util.Collection; | |
+ | |
/** | |
* Configuration helper tool for MongoDB related Map/Reduce jobs | |
*/ | |
@@ -132,6 +134,12 @@ public class MongoConfigUtil { | |
*/ | |
public static final String SPLITS_SLAVE_OK = "mongo.input.split.allow_read_from_secondaries"; | |
+ | |
+ /** | |
+ * Specify paths to shards directly so they don't need to be read from the config db. | |
+ */ | |
+ public static final String SHARD_URIS = "mongo.input.split.shard_uris"; | |
+ | |
public static boolean isJobVerbose( Configuration conf ){ | |
return conf.getBoolean( JOB_VERBOSE, false ); | |
} | |
@@ -256,6 +264,7 @@ public class MongoConfigUtil { | |
public static DBCollection getCollection( MongoURI uri ){ | |
try { | |
+ log.warn("connecting to" + uri.toString()); | |
Mongo mongo = _mongos.connect( uri ); | |
DB myDb = mongo.getDB(uri.getDatabase()); | |
@@ -266,7 +275,7 @@ public class MongoConfigUtil { | |
log.info("Sucessfully authenticated with collection."); | |
} | |
else { | |
- throw new IllegalArgumentException( "Unable to connect to collection." ); | |
+ throw new IllegalArgumentException( "Unable to connect to collection: " + uri.toString() ); | |
} | |
} | |
return uri.connectCollection(mongo); | |
@@ -493,6 +502,10 @@ public class MongoConfigUtil { | |
public static String getInputSplitKeyPattern( Configuration conf ) { | |
return conf.get( INPUT_SPLIT_KEY_PATTERN, "{ \"_id\": 1 }" ); | |
} | |
+ | |
+ public static Collection<String> getShardURIs( Configuration conf ){ | |
+ return conf.getTrimmedStringCollection( SHARD_URIS ); | |
+ } | |
public static DBObject getInputSplitKey( Configuration conf ) { | |
try { | |
diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java | |
index e4b28a5..5d005d5 100644 | |
--- a/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java | |
+++ b/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java | |
@@ -48,8 +48,9 @@ public class MongoSplitter { | |
} | |
DB db = mongo.getDB( uri.getDatabase() ); | |
- DBCollection coll = db.getCollection( uri.getCollection() ); | |
+ DBCollection coll = MongoConfigUtil.getCollection( uri ); | |
final CommandResult stats = coll.getStats(); | |
+ log.info(stats.toString()); | |
final boolean isSharded = stats.getBoolean( "sharded", false ); | |
@@ -89,13 +90,19 @@ public class MongoSplitter { | |
final int splitSize = conf.getSplitSize(); // in MB | |
final String ns = coll.getFullName(); | |
final DBObject q = conf.getQuery(); | |
- | |
+ log.info("uri:" + uri.toString()); | |
log.info( "Calculating unsharded input splits on namespace '" + ns + "' with Split Key '" + splitKey.toString() + "' and a split size of '" + splitSize + "'mb per" ); | |
final DBObject cmd = BasicDBObjectBuilder.start("splitVector", ns). | |
add( "keyPattern", splitKey ). | |
add( "force", false ). // force:True is misbehaving it seems | |
add( "maxChunkSize", splitSize ).get(); | |
+ | |
+ if(uri.getUsername() != null && uri.getPassword() != null){ | |
+ log.warn("authenticating. [" + uri.getUsername() + "] [" + uri.getPassword() + "]"); | |
+ boolean auth = coll.getDB().authenticate(uri.getUsername(), uri.getPassword()); | |
+ log.warn("resul of auth " + auth); | |
+ } | |
log.trace( "Issuing Command: " + cmd ); | |
CommandResult data = coll.getDB().command( cmd ); | |
@@ -206,31 +213,47 @@ public class MongoSplitter { | |
Boolean slaveOk ){ | |
log.warn( "WARNING getting splits that connect directly to the backend mongods" | |
+ " is risky and might not produce correct results" ); | |
- DB configDb = mongo.getDB( "config" ); | |
- DBCollection shardsColl = configDb.getCollection( "shards" ); | |
- | |
- Set<String> shardSet = new java.util.HashSet<String>(); | |
- DBCursor cur = shardsColl.find(); | |
- try { | |
- while ( cur.hasNext() ){ | |
- final BasicDBObject row = (BasicDBObject) cur.next(); | |
- String host = row.getString( "host" ); | |
- int slashIndex = host.indexOf( '/' ); | |
- if ( slashIndex > 0 ) | |
- host = host.substring( slashIndex + 1 ); | |
- shardSet.add( host ); | |
- } | |
- } | |
- finally { | |
- if ( cur != null ) | |
- cur.close(); | |
- cur = null; | |
- } | |
- final List<InputSplit> splits = new ArrayList<InputSplit>( shardSet.size() ); | |
+ Collection<String> shardUriStrings = conf.getShardURIs(); | |
+ ArrayList<MongoURI> shardUris = new ArrayList<MongoURI>(); | |
+ | |
+ if(shardUriStrings == null){ | |
+ DB configDb = mongo.getDB( "config" ); | |
+ DBCollection shardsColl = configDb.getCollection( "shards" ); | |
+ | |
+ Set<String> shardSet = new java.util.HashSet<String>(); | |
+ | |
+ DBCursor cur = shardsColl.find(); | |
+ try { | |
+ while ( cur.hasNext() ){ | |
+ final BasicDBObject row = (BasicDBObject) cur.next(); | |
+ String host = row.getString( "host" ); | |
+ int slashIndex = host.indexOf( '/' ); | |
+ if ( slashIndex > 0 ) | |
+ host = host.substring( slashIndex + 1 ); | |
+ shardSet.add( host ); | |
+ } | |
+ } | |
+ finally { | |
+ if ( cur != null ) | |
+ cur.close(); | |
+ cur = null; | |
+ } | |
+ for( String host : shardSet ){ | |
+ MongoURI thisUri = getNewURI( uri, host, slaveOk ); | |
+ shardUris.add(thisUri); | |
+ log.warn(thisUri); | |
+ } | |
+ }else{ | |
+ log.warn( "WARNING loading shard URIs directly from config file."); | |
+ for(String hostUri : shardUriStrings ){ | |
+ log.warn(hostUri); | |
+ shardUris.add( new MongoURI(hostUri) ); | |
+ } | |
+ } | |
+ final List<InputSplit> splits = new ArrayList<InputSplit>( shardUris.size() ); | |
//todo: using stats only get the shards that actually host data for this collection | |
- for ( String host : shardSet ){ | |
- MongoURI thisUri = getNewURI( uri, host, slaveOk ); | |
+ for ( MongoURI thisUri : shardUris ){ | |
splits.add( new MongoInputSplit( thisUri, conf.getInputKey(), conf.getQuery(), conf.getFields(), | |
conf.getSort(), conf.getLimit(), conf.getSkip(), | |
conf.isNoTimeout() ) ); // TODO - Should the input Key be the shard key? | |
diff --git a/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java b/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java | |
index 30f4309..58004da 100644 | |
--- a/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java | |
+++ b/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java | |
@@ -31,8 +31,11 @@ public class TreasuryYieldXMLConfig extends MongoTool { | |
static{ | |
// Load the XML config defined in hadoop-local.xml | |
Configuration.addDefaultResource( "src/examples/hadoop-local.xml" ); | |
Configuration.addDefaultResource( "src/examples/mongo-defaults.xml" ); | |
+ Configuration.addDefaultResource( "mongo-treasury_yield.xml" ); | |
} | |
public static void main( final String[] pArgs ) throws Exception{ | |
diff --git a/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml b/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml | |
index a1bdabc..673f1a3 100644 | |
--- a/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml | |
+++ b/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml | |
@@ -1,5 +1,26 @@ | |
<?xml version="1.0"?> | |
<configuration> | |
+ <!-- | |
+ <property> | |
+ <name>mongo.input.split.shardservers</name> | |
+ <value>false</value> | |
+ </property>--> | |
+ | |
+ <property> | |
+ <name>mongo.input.split.shard_uris</name> | |
+ <value>mongodb://127.0.0.1:4000/demo.yield_historical.in,mongodb://127.0.0.1:4003/demo.yield_historical.in</value> | |
+ </property> | |
+ | |
+ <property> | |
+ <name>mongo.input.split.read_shard_chunks</name> | |
+ <value>false</value> | |
+ </property> | |
+ | |
+ <property> | |
+ <name>mongo.input.split.read_from_shards</name> | |
+ <value>true</value> | |
+ </property> | |
+ | |
<property> | |
<!-- run the job verbosely ? --> | |
<name>mongo.job.verbose</name> | |
@@ -18,12 +39,12 @@ | |
<property> | |
<!-- If you are reading from mongo, the URI --> | |
<name>mongo.input.uri</name> | |
- <value>mongodb://127.0.0.1/mongo_hadoop.yield_historical.in</value> | |
+ <value>mongodb://mike:[email protected]:4007/demo.yield_historical.in</value> | |
</property> | |
<property> | |
<!-- If you are writing to mongo, the URI --> | |
<name>mongo.output.uri</name> | |
- <value>mongodb://127.0.0.1/mongo_hadoop.yield_historical.out</value> | |
+ <value>mongodb://mike:[email protected]:4007/demo.yield_historical.out</value> | |
</property> | |
<property> | |
<!-- The query, in JSON, to execute [OPTIONAL] --> | |
@@ -110,3 +131,4 @@ | |
</property> | |
</configuration> | |
+ | |
-- | |
1.7.4.4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment