Last active
August 29, 2015 13:56
-
-
Save nojima/8817880 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "bufio" | |
import "flag" | |
import "fmt" | |
import "log" | |
import "os" | |
import "strings" | |
import "sync" | |
import "github.com/tpjg/goriakpbc" | |
type Options struct { | |
Addr string | |
PoolSize int | |
ContentType string | |
} | |
func insertItem(bucket *riak.Bucket, key string, value string, contentType string, | |
waitGroup *sync.WaitGroup) { | |
defer waitGroup.Done() | |
obj := bucket.NewObject(key) | |
obj.ContentType = contentType | |
obj.Indexes["key_bin"] = key | |
obj.Data = []byte(value) | |
obj.Store() | |
} | |
func bulkInsert(bucketName string, file *os.File, options *Options) { | |
if err := riak.ConnectClientPool(options.Addr, options.PoolSize); err != nil { | |
log.Fatal(err) | |
} | |
bucket, err := riak.NewBucket(bucketName) | |
if err != nil { | |
log.Fatal(err) | |
} | |
var waitGroup sync.WaitGroup | |
scanner := bufio.NewScanner(file) | |
for i := 1; scanner.Scan(); i += 1 { | |
row := strings.SplitN(scanner.Text(), "\t", 2) | |
if len(row) != 2 { | |
log.Fatal("invalid format at line %v.", i) | |
} | |
waitGroup.Add(1) | |
go insertItem(bucket, row[0], row[1], options.ContentType, &waitGroup) | |
} | |
if err := scanner.Err(); err != nil { | |
log.Fatal(err) | |
} | |
waitGroup.Wait() | |
} | |
func printUsage() { | |
fmt.Fprintf(os.Stderr, "Usage: riak-bulk-insert [options] BUCKET FILE\n\n") | |
fmt.Fprintf(os.Stderr, "Arguments:\n" + | |
" BUCKET the name of the bucket where objects inserted.\n" + | |
" FILE the name of the file containing a tab-separated\n" + | |
" (key, value) pair for each line.\n\n") | |
fmt.Fprintf(os.Stderr, "Options:\n") | |
flag.VisitAll(func(f *flag.Flag) { | |
fmt.Fprintf(os.Stderr, " -%-11v %v default: %v\n", f.Name, f.Usage, f.DefValue) | |
}) | |
} | |
func main() { | |
options := new(Options) | |
flag.StringVar(&options.Addr, "addr", "localhost:8087", | |
"the hostname and port number of the riak server.") | |
flag.IntVar(&options.PoolSize, "pool", 10, | |
"the size of the connection pool") | |
flag.StringVar(&options.ContentType, "content-type", "text/plain", | |
"the content type of the inserted objects") | |
flag.Parse() | |
if flag.NArg() != 2 { | |
printUsage(); | |
os.Exit(1) | |
} | |
bucket := flag.Arg(0) | |
filename := flag.Arg(1) | |
file, err := os.Open(filename) | |
if err != nil { | |
log.Fatal(err) | |
} | |
bulkInsert(bucket, file, options) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment