Created
September 1, 2017 01:11
-
-
Save hectorgool/80cfa09ae2fa91adaf5c141112b44a9d to your computer and use it in GitHub Desktop.
Load a huge file faster in Elasticsearch Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
twitter@hector_gool | |
https://github.com/olivere/elastic/wiki/BulkIndex | |
*/ | |
package main | |
import ( | |
"fmt" | |
elastic "gopkg.in/olivere/elastic.v5" | |
"encoding/csv" | |
"github.com/satori/go.uuid" | |
"context" | |
"os" | |
"log" | |
"strconv" | |
) | |
type ( | |
Document struct { | |
Ciudad string `json:"ciudad"` | |
Colonia string `json:"colonia"` | |
Cp string `json:"cp"` | |
Delegacion string `json:"delegacion"` | |
Location `json:"location"` | |
} | |
Location struct { | |
Lat float64 `json:"lat"` | |
Lon float64 `json:"lon"` | |
} | |
) | |
const ( | |
FILE ="./MX.txt" | |
TOTAL_ROWS = 1000000 | |
) | |
var ( | |
client *elastic.Client | |
) | |
func init() { | |
var err error | |
client, err = elastic.NewClient( | |
elastic.SetURL(os.Getenv("ELASTICSEARCH_ENTRYPOINT")), | |
elastic.SetBasicAuth(os.Getenv("ELASTICSEARCH_USERNAME"), os.Getenv("ELASTICSEARCH_PASSWORD")), | |
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), | |
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), | |
) | |
printError(err) | |
} | |
func main() { | |
ctx := context.Background() | |
file, err := os.Open(FILE) | |
printError(err) | |
defer file.Close() | |
reader := csv.NewReader(file) | |
reader.Comma = '\t' | |
rows, err := reader.ReadAll() | |
printError(err) | |
bulkRequest := client.Bulk() | |
for n, col := range rows { | |
n++ | |
id := uuid.NewV4().String() | |
if n <= TOTAL_ROWS { | |
lat, err := strconv.ParseFloat(col[9], 64) | |
printError(err) | |
lon, err := strconv.ParseFloat(col[10], 64) | |
printError(err) | |
document := Document{ | |
Ciudad: col[3], | |
Colonia: col[2], | |
Cp: col[1], | |
Delegacion: col[5], | |
Location: Location{ | |
Lat: lat, | |
Lon: lon, | |
}, | |
} | |
req := elastic.NewBulkIndexRequest().Index(os.Getenv("ELASTICSEARCH_INDEX")).Type(os.Getenv("ELASTICSEARCH_TYPE")).Id(id).Doc(document) | |
bulkRequest = bulkRequest.Add(req) | |
fmt.Printf("%v: %v\n", n, document) | |
} | |
} | |
bulkResponse, err := bulkRequest.Do(ctx) | |
printError(err) | |
indexed := bulkResponse.Indexed() | |
if len(indexed) != 1 { | |
fmt.Printf("\n Indexed documents: %v \n", len(indexed)) | |
} | |
} | |
func printError(err error) { | |
if err != nil { | |
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) | |
os.Exit(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment