Skip to content

Instantly share code, notes, and snippets.

@mpobrien
Created August 14, 2012 22:57
Show Gist options
  • Save mpobrien/3353732 to your computer and use it in GitHub Desktop.
Save mpobrien/3353732 to your computer and use it in GitHub Desktop.
hadoop-43
From c23186a9dd96ba5dee1b526beab5c91344928836 Mon Sep 17 00:00:00 2001
From: Mike O'Brien <[email protected]>
Date: Tue, 14 Aug 2012 18:44:52 -0400
Subject: [PATCH] hack to allow direct linking of mongouris
---
build.sbt | 2 +-
.../main/java/com/mongodb/hadoop/MongoConfig.java | 5 ++
.../com/mongodb/hadoop/util/MongoConfigUtil.java | 15 ++++-
.../com/mongodb/hadoop/util/MongoSplitter.java | 73 +++++++++++++-------
.../examples/treasury/TreasuryYieldXMLConfig.java | 3 +
.../src/main/resources/mongo-treasury_yield.xml | 26 +++++++-
6 files changed, 95 insertions(+), 29 deletions(-)
diff --git a/build.sbt b/build.sbt
index 186420b..19dbed3 100644
--- a/build.sbt
+++ b/build.sbt
@@ -6,4 +6,4 @@ organization := "org.mongodb"
seq(net.virtualvoid.sbt.graph.Plugin.graphSettings: _*)
-hadoopRelease in ThisBuild := "default"
+hadoopRelease in ThisBuild := "cdh3"
diff --git a/core/src/main/java/com/mongodb/hadoop/MongoConfig.java b/core/src/main/java/com/mongodb/hadoop/MongoConfig.java
index 38825c4..2dabdaa 100644
--- a/core/src/main/java/com/mongodb/hadoop/MongoConfig.java
+++ b/core/src/main/java/com/mongodb/hadoop/MongoConfig.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.io.*;
+import java.util.Collection;
// Hadoop
// Commons
@@ -276,6 +277,10 @@ public class MongoConfig {
MongoConfigUtil.setSplitSize( _conf, value );
}
+ public Collection<String> getShardURIs(){
+ return MongoConfigUtil.getShardURIs( _conf );
+ }
+
/**
* if TRUE,
* Splits will be read by connecting to the individual shard servers,
diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java
index 0369574..d3c99e1 100644
--- a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java
+++ b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
+import java.util.Collection;
+
/**
* Configuration helper tool for MongoDB related Map/Reduce jobs
*/
@@ -132,6 +134,12 @@ public class MongoConfigUtil {
*/
public static final String SPLITS_SLAVE_OK = "mongo.input.split.allow_read_from_secondaries";
+
+ /**
+ * Specify paths to shards directly so they don't need to be read from the config db.
+ */
+ public static final String SHARD_URIS = "mongo.input.split.shard_uris";
+
public static boolean isJobVerbose( Configuration conf ){
return conf.getBoolean( JOB_VERBOSE, false );
}
@@ -256,6 +264,7 @@ public class MongoConfigUtil {
public static DBCollection getCollection( MongoURI uri ){
try {
+ log.warn("connecting to" + uri.toString());
Mongo mongo = _mongos.connect( uri );
DB myDb = mongo.getDB(uri.getDatabase());
@@ -266,7 +275,7 @@ public class MongoConfigUtil {
log.info("Sucessfully authenticated with collection.");
}
else {
- throw new IllegalArgumentException( "Unable to connect to collection." );
+ throw new IllegalArgumentException( "Unable to connect to collection: " + uri.toString() );
}
}
return uri.connectCollection(mongo);
@@ -493,6 +502,10 @@ public class MongoConfigUtil {
public static String getInputSplitKeyPattern( Configuration conf ) {
return conf.get( INPUT_SPLIT_KEY_PATTERN, "{ \"_id\": 1 }" );
}
+
+ public static Collection<String> getShardURIs( Configuration conf ){
+ return conf.getTrimmedStringCollection( SHARD_URIS );
+ }
public static DBObject getInputSplitKey( Configuration conf ) {
try {
diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java
index e4b28a5..5d005d5 100644
--- a/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java
+++ b/core/src/main/java/com/mongodb/hadoop/util/MongoSplitter.java
@@ -48,8 +48,9 @@ public class MongoSplitter {
}
DB db = mongo.getDB( uri.getDatabase() );
- DBCollection coll = db.getCollection( uri.getCollection() );
+ DBCollection coll = MongoConfigUtil.getCollection( uri );
final CommandResult stats = coll.getStats();
+ log.info(stats.toString());
final boolean isSharded = stats.getBoolean( "sharded", false );
@@ -89,13 +90,19 @@ public class MongoSplitter {
final int splitSize = conf.getSplitSize(); // in MB
final String ns = coll.getFullName();
final DBObject q = conf.getQuery();
-
+ log.info("uri:" + uri.toString());
log.info( "Calculating unsharded input splits on namespace '" + ns + "' with Split Key '" + splitKey.toString() + "' and a split size of '" + splitSize + "'mb per" );
final DBObject cmd = BasicDBObjectBuilder.start("splitVector", ns).
add( "keyPattern", splitKey ).
add( "force", false ). // force:True is misbehaving it seems
add( "maxChunkSize", splitSize ).get();
+
+ if(uri.getUsername() != null && uri.getPassword() != null){
+ log.warn("authenticating. [" + uri.getUsername() + "] [" + uri.getPassword() + "]");
+ boolean auth = coll.getDB().authenticate(uri.getUsername(), uri.getPassword());
+ log.warn("resul of auth " + auth);
+ }
log.trace( "Issuing Command: " + cmd );
CommandResult data = coll.getDB().command( cmd );
@@ -206,31 +213,47 @@ public class MongoSplitter {
Boolean slaveOk ){
log.warn( "WARNING getting splits that connect directly to the backend mongods"
+ " is risky and might not produce correct results" );
- DB configDb = mongo.getDB( "config" );
- DBCollection shardsColl = configDb.getCollection( "shards" );
-
- Set<String> shardSet = new java.util.HashSet<String>();
- DBCursor cur = shardsColl.find();
- try {
- while ( cur.hasNext() ){
- final BasicDBObject row = (BasicDBObject) cur.next();
- String host = row.getString( "host" );
- int slashIndex = host.indexOf( '/' );
- if ( slashIndex > 0 )
- host = host.substring( slashIndex + 1 );
- shardSet.add( host );
- }
- }
- finally {
- if ( cur != null )
- cur.close();
- cur = null;
- }
- final List<InputSplit> splits = new ArrayList<InputSplit>( shardSet.size() );
+ Collection<String> shardUriStrings = conf.getShardURIs();
+ ArrayList<MongoURI> shardUris = new ArrayList<MongoURI>();
+
+ if(shardUriStrings == null){
+ DB configDb = mongo.getDB( "config" );
+ DBCollection shardsColl = configDb.getCollection( "shards" );
+
+ Set<String> shardSet = new java.util.HashSet<String>();
+
+ DBCursor cur = shardsColl.find();
+ try {
+ while ( cur.hasNext() ){
+ final BasicDBObject row = (BasicDBObject) cur.next();
+ String host = row.getString( "host" );
+ int slashIndex = host.indexOf( '/' );
+ if ( slashIndex > 0 )
+ host = host.substring( slashIndex + 1 );
+ shardSet.add( host );
+ }
+ }
+ finally {
+ if ( cur != null )
+ cur.close();
+ cur = null;
+ }
+ for( String host : shardSet ){
+ MongoURI thisUri = getNewURI( uri, host, slaveOk );
+ shardUris.add(thisUri);
+ log.warn(thisUri);
+ }
+ }else{
+ log.warn( "WARNING loading shard URIs directly from config file.");
+ for(String hostUri : shardUriStrings ){
+ log.warn(hostUri);
+ shardUris.add( new MongoURI(hostUri) );
+ }
+ }
+ final List<InputSplit> splits = new ArrayList<InputSplit>( shardUris.size() );
//todo: using stats only get the shards that actually host data for this collection
- for ( String host : shardSet ){
- MongoURI thisUri = getNewURI( uri, host, slaveOk );
+ for ( MongoURI thisUri : shardUris ){
splits.add( new MongoInputSplit( thisUri, conf.getInputKey(), conf.getQuery(), conf.getFields(),
conf.getSort(), conf.getLimit(), conf.getSkip(),
conf.isNoTimeout() ) ); // TODO - Should the input Key be the shard key?
diff --git a/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java b/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java
index 30f4309..58004da 100644
--- a/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java
+++ b/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/TreasuryYieldXMLConfig.java
@@ -31,8 +31,11 @@ public class TreasuryYieldXMLConfig extends MongoTool {
static{
// Load the XML config defined in hadoop-local.xml
Configuration.addDefaultResource( "src/examples/hadoop-local.xml" );
Configuration.addDefaultResource( "src/examples/mongo-defaults.xml" );
+ Configuration.addDefaultResource( "mongo-treasury_yield.xml" );
}
public static void main( final String[] pArgs ) throws Exception{
diff --git a/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml b/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml
index a1bdabc..673f1a3 100644
--- a/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml
+++ b/examples/treasury_yield/src/main/resources/mongo-treasury_yield.xml
@@ -1,5 +1,26 @@
<?xml version="1.0"?>
<configuration>
+ <!--
+ <property>
+ <name>mongo.input.split.shardservers</name>
+ <value>false</value>
+ </property>-->
+
+ <property>
+ <name>mongo.input.split.shard_uris</name>
+ <value>mongodb://127.0.0.1:4000/demo.yield_historical.in,mongodb://127.0.0.1:4003/demo.yield_historical.in</value>
+ </property>
+
+ <property>
+ <name>mongo.input.split.read_shard_chunks</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>mongo.input.split.read_from_shards</name>
+ <value>true</value>
+ </property>
+
<property>
<!-- run the job verbosely ? -->
<name>mongo.job.verbose</name>
@@ -18,12 +39,12 @@
<property>
<!-- If you are reading from mongo, the URI -->
<name>mongo.input.uri</name>
- <value>mongodb://127.0.0.1/mongo_hadoop.yield_historical.in</value>
+ <value>mongodb://mike:[email protected]:4007/demo.yield_historical.in</value>
</property>
<property>
<!-- If you are writing to mongo, the URI -->
<name>mongo.output.uri</name>
- <value>mongodb://127.0.0.1/mongo_hadoop.yield_historical.out</value>
+ <value>mongodb://mike:[email protected]:4007/demo.yield_historical.out</value>
</property>
<property>
<!-- The query, in JSON, to execute [OPTIONAL] -->
@@ -110,3 +131,4 @@
</property>
</configuration>
+
--
1.7.4.4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment