Usage
go run orcbulkimport.go --key="<orc_key>" --host="<orc_host>" <filename>| package main | |
| import ( | |
| "bufio" | |
| "compress/flate" | |
| "compress/gzip" | |
| "encoding/json" | |
| "flag" | |
| "fmt" | |
| "io" | |
| "io/ioutil" | |
| "log" | |
| "net" | |
| "net/http" | |
| "os" | |
| "sync" | |
| "time" | |
| ) | |
| // For older go releases (specifically 1.2 and earlier) there is an issue with | |
| // COMODO certificates since they sign with sha384 which is not included by | |
| // default. As such we force include the package to get the support. On newer | |
| // golang installs this does nothing. | |
| // For more information see this blog post: | |
| // http://bridge.grumpy-troll.org/2014/05/golang-tls-comodo/ | |
| import _ "crypto/sha512" | |
| var ( | |
| apiKey = flag.String("key", "00000000-0000-0000-0000-000000000000", "the api key") | |
| workerCount = flag.Int("workers", 8, "the number of worker procs") | |
| host = flag.String("host", "api.orchestrate.io", "the Orchestrate API host to use") | |
| reqs = make(chan Request, 100) | |
| dialTimeout = 3 * time.Second | |
| responseHeaderTimeout = 60 * time.Second | |
| wg sync.WaitGroup | |
| client *http.Client | |
| ) | |
| type Request struct { | |
| reader io.Reader | |
| respChan chan Response | |
| } | |
| type Response struct { | |
| body map[string]interface{} | |
| err *error | |
| eof bool | |
| total int | |
| } | |
| func main() { | |
| flag.Parse() | |
| client = &http.Client{Transport: &http.Transport{ | |
| MaxIdleConnsPerHost: *workerCount, | |
| ResponseHeaderTimeout: responseHeaderTimeout, | |
| Dial: func(network, addr string) (net.Conn, error) { | |
| return net.DialTimeout(network, addr, dialTimeout) | |
| }, | |
| }} | |
| startRequestHandlerPool() | |
| for _, file := range flag.Args() { | |
| wg.Add(1) | |
| go func(file string) { | |
| importFile(file) | |
| }(file) | |
| } | |
| wg.Wait() | |
| close(reqs) | |
| } | |
| func startRequestHandlerPool() { | |
| for i := 0; i < *workerCount; i++ { | |
| go handleRequests(reqs) | |
| } | |
| } | |
| func importFile(filename string) { | |
| file, err := os.Open(filename) | |
| if err != nil { | |
| log.Printf("Error: %v\n", err) | |
| return | |
| } | |
| defer file.Close() | |
| stats, _ := file.Stat() | |
| fileSize := stats.Size() | |
| log.Printf("Importing %v", filename) | |
| reader := bufio.NewReaderSize(file, 1024*1024) | |
| var resps = make(chan Response, 100) | |
| go handleResponses(filename, fileSize, resps) | |
| var pReader *io.PipeReader | |
| var pWriter *io.PipeWriter | |
| var i int | |
| for i = 0; err == nil; i++ { | |
| if i%250 == 0 { | |
| if pWriter != nil { | |
| pWriter.Close() | |
| } | |
| pReader, pWriter = io.Pipe() | |
| reqs <- Request{pReader, resps} | |
| } | |
| var line []byte | |
| line, err = reader.ReadBytes('\n') | |
| pWriter.Write(line) | |
| } | |
| if pWriter != nil { | |
| pWriter.Close() | |
| } | |
| if err != nil && err != io.EOF { | |
| log.Panicf("Scanner error: %v\n", err) | |
| } | |
| resps <- Response{nil, nil, true, i-1} | |
| } | |
| func handleRequests(reqs chan Request) { | |
| for req := range reqs { | |
| var err error | |
| body := make(map[string]interface{}) | |
| if resp, err := jsonReply("POST", "", req.reader, 200, &body); err != nil { | |
| log.Printf("Error %v %v\n", err, resp) | |
| continue | |
| } | |
| req.respChan <- Response{body, &err, false, 0} | |
| } | |
| } | |
| func handleResponses(filename string, fileSize int64, resps chan Response) { | |
| var importCount, errorCount, totalCount int | |
| eof := false | |
| for resp := range resps { | |
| if resp.eof { | |
| eof = true | |
| totalCount = resp.total | |
| } | |
| if resp.err != nil { | |
| switch err := (*resp.err).(type) { | |
| case OrchestrateError: | |
| errorCount++ | |
| log.Printf("Error: %v", err) | |
| } | |
| } | |
| if resp.body != nil { | |
| if resp.body["status"] != "success" { | |
| log.Printf("%v: %v", resp.body["status"], resp.body["message"]) | |
| for _, result := range resp.body["results"].([]interface{}) { | |
| resultMap := result.(map[string]interface{}) | |
| if resultMap["status"].(string) == "failure" { | |
| log.Printf("Item failure: %v", resultMap["error"]) | |
| errorCount++ | |
| } | |
| } | |
| } | |
| importCount += int(resp.body["success_count"].(float64)) | |
| } | |
| if importCount%1000 == 0 { | |
| log.Printf("Progress imported %v items from %v", importCount, filename) | |
| } | |
| if eof && importCount >= totalCount - errorCount { | |
| close(resps) | |
| } | |
| } | |
| log.Printf("Done importing %v items from %v (with %v errors)", importCount, filename, errorCount) | |
| wg.Done() | |
| } | |
| // Executes an HTTP request. | |
| func doRequest( | |
| method, trailing string, headers map[string]string, body io.Reader, | |
| ) (*http.Response, error) { | |
| url := "https://" + *host + "/v0/" + trailing | |
| // Create the new Request. | |
| req, err := http.NewRequest(method, url, body) | |
| if err != nil { | |
| return nil, err | |
| } | |
| // Ensure that the query gets the authToken as username. | |
| req.SetBasicAuth(*apiKey, "") | |
| // Add any headers that the client provided. | |
| for k, v := range headers { | |
| req.Header.Add(k, v) | |
| } | |
| req.Header.Add("User-Agent", "orcbulkimport") | |
| req.Header.Add("Content-Type", "application/orchestrate-export-stream+json") | |
| return client.Do(req) | |
| } | |
| // This call will perform a request which expects a JSON body to be returned. | |
| // The contents of the body will be decoded into the value given. | |
| // | |
| // Any status return other than 'status' will cause an error to be returned | |
| // from this function. | |
| func jsonReply( | |
| method, path string, body io.Reader, status int, value interface{}, | |
| ) (*http.Response, error) { | |
| resp, err := doRequest(method, path, nil, body) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer resp.Body.Close() | |
| // Ensure that the returned status was expected. | |
| if resp.StatusCode != status { | |
| return nil, newError(resp) | |
| } | |
| // See what kind of encoding the server is replying with. | |
| var decoder *json.Decoder | |
| switch resp.Header.Get("Content-Encoding") { | |
| case "gzip": | |
| gzipReader, err := gzip.NewReader(resp.Body) | |
| if err != nil { | |
| return nil, err | |
| } | |
| decoder = json.NewDecoder(gzipReader) | |
| case "deflate": | |
| decoder = json.NewDecoder(flate.NewReader(resp.Body)) | |
| default: | |
| decoder = json.NewDecoder(resp.Body) | |
| } | |
| // Decode the body into a json object. | |
| if err := decoder.Decode(value); err != nil { | |
| return nil, err | |
| } | |
| // Success! | |
| return resp, nil | |
| } | |
| func newError(resp *http.Response) error { | |
| // We need to ensure that the body is read no matter what otherwise | |
| // connections won't be reused. | |
| body, err := ioutil.ReadAll(resp.Body) | |
| if err != nil { | |
| return err | |
| } | |
| oe := &OrchestrateError{ | |
| Status: resp.Status, | |
| StatusCode: resp.StatusCode, | |
| } | |
| if err := json.Unmarshal(body, oe); err != nil { | |
| oe.Message = string(body) | |
| return oe | |
| } | |
| return oe | |
| } | |
| type OrchestrateError struct { | |
| // The status string returned from the HTTP call. | |
| Status string `json:"-"` | |
| // The status, as an integer, returned from the HTTP call. | |
| StatusCode int `json:"-"` | |
| // The Orchestrate specific message representing the error. | |
| Message string `json:"message"` | |
| } | |
| // Convert the error to a meaningful string. | |
| func (e OrchestrateError) Error() string { | |
| return fmt.Sprintf("%s (%d): %s", e.Status, e.StatusCode, e.Message) | |
| } |
Usage