Created
June 23, 2012 03:35
-
-
Save jmshoffs0812/2976699 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.foo; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import com.basho.riak.client.IRiakClient; | |
import com.basho.riak.client.IRiakObject; | |
import com.basho.riak.client.RiakException; | |
import com.basho.riak.client.RiakFactory; | |
import com.basho.riak.client.bucket.Bucket; | |
import com.basho.riak.client.raw.pbc.PBClientConfig; | |
import com.basho.riak.client.raw.pbc.PBClusterConfig; | |
public class App { | |
private static Logger logger = Logger.getLogger(App.class.getName()); | |
public static void main(String[] args) { | |
int maxConnections = 20; | |
String sourceIP = "source.com"; | |
String destIP = "dest.com"; | |
if(args.length >= 1 && null != args[0]) { | |
sourceIP = args[0]; | |
} | |
if(args.length >= 2 && null != args[1]) { | |
sourceIP = args[1]; | |
} | |
if(args.length >= 3 && null != args[2]) { | |
try { | |
maxConnections = Integer.parseInt(args[2]); | |
} catch (Exception e) { | |
logger.severe("Wrong value for Riak pool size: " + args[2] + " - using default value: " + maxConnections); | |
} | |
} | |
String sourceHTTP = "http://" + sourceIP + ":8098/riak"; | |
String destHTTP = "http://" + destIP + ":8098/riak"; | |
riakBackup(maxConnections, sourceIP, destIP, sourceHTTP, destHTTP); | |
} | |
private static void riakBackup(int maxConnections, String sourceIP, String destIP, String sourceHTTP, String destHTTP) { | |
try { | |
PBClusterConfig sourcePbClusterConfig = new PBClusterConfig(maxConnections); | |
PBClientConfig sourcePbClientConfig = PBClientConfig.defaults(); | |
sourcePbClusterConfig.addHosts(sourcePbClientConfig, sourceIP); | |
PBClusterConfig destPbClusterConfig = new PBClusterConfig(maxConnections); | |
PBClientConfig destPbClientConfig = PBClientConfig.defaults(); | |
destPbClusterConfig.addHosts(destPbClientConfig, destIP); | |
IRiakClient sourceClient = RiakFactory.newClient(sourcePbClusterConfig); | |
Iterable<String> bucketList = sourceClient.listBuckets(); | |
sourceClient.shutdown(); | |
sourceClient = null; | |
for (String bucketName : bucketList) { | |
logger.info("Bucket name: " + bucketName); | |
IRiakClient sClient = RiakFactory.newClient(sourcePbClusterConfig); | |
IRiakClient dClient = RiakFactory.newClient(destPbClusterConfig); | |
IRiakClient sBucketClient = RiakFactory.httpClient(sourceHTTP); | |
IRiakClient dBucketClient = RiakFactory.httpClient(destHTTP); | |
try { | |
//Can only set/get bucket props via HTTP | |
Bucket sourceBucket = sBucketClient.fetchBucket(bucketName).execute(); | |
Bucket destBucket = dBucketClient.updateBucket(sourceBucket).execute(); | |
sourceBucket = sClient.fetchBucket(bucketName).execute(); | |
destBucket = dClient.fetchBucket(bucketName).execute(); | |
sBucketClient.shutdown(); | |
dBucketClient.shutdown(); | |
logger.info("Got both source and dest buckets..."); | |
Iterable<String> keyList = sourceBucket.keys(); | |
for (String key : keyList) { | |
logger.info("Key name: " + key); | |
try { | |
IRiakObject myObject = sourceBucket.fetch(key).execute(); | |
IRiakObject destObject = destBucket.store(myObject).execute(); | |
if (myObject != destObject) { | |
System.err.print(destObject); | |
} | |
} catch (Exception e) { | |
logger.log(Level.SEVERE, "Error processing key: " + key, e); | |
} | |
} | |
} catch (Exception e) { | |
logger.log(Level.SEVERE, "Error processing bucketName: " + bucketName, e); | |
} | |
sClient.shutdown(); | |
dClient.shutdown(); | |
} | |
} catch (RiakException e) { | |
logger.log(Level.SEVERE, "RiakException: ", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment