Skip to content

Instantly share code, notes, and snippets.

@hectorgool
Created September 1, 2017 01:11
Show Gist options
  • Save hectorgool/80cfa09ae2fa91adaf5c141112b44a9d to your computer and use it in GitHub Desktop.
Save hectorgool/80cfa09ae2fa91adaf5c141112b44a9d to your computer and use it in GitHub Desktop.
Load a huge file faster in Elasticsearch Server
/*
twitter@hector_gool
https://github.com/olivere/elastic/wiki/BulkIndex
*/
package main
import (
"fmt"
elastic "gopkg.in/olivere/elastic.v5"
"encoding/csv"
"github.com/satori/go.uuid"
"context"
"os"
"log"
"strconv"
)
type (
Document struct {
Ciudad string `json:"ciudad"`
Colonia string `json:"colonia"`
Cp string `json:"cp"`
Delegacion string `json:"delegacion"`
Location `json:"location"`
}
Location struct {
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
}
)
const (
FILE ="./MX.txt"
TOTAL_ROWS = 1000000
)
var (
client *elastic.Client
)
func init() {
var err error
client, err = elastic.NewClient(
elastic.SetURL(os.Getenv("ELASTICSEARCH_ENTRYPOINT")),
elastic.SetBasicAuth(os.Getenv("ELASTICSEARCH_USERNAME"), os.Getenv("ELASTICSEARCH_PASSWORD")),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
printError(err)
}
func main() {
ctx := context.Background()
file, err := os.Open(FILE)
printError(err)
defer file.Close()
reader := csv.NewReader(file)
reader.Comma = '\t'
rows, err := reader.ReadAll()
printError(err)
bulkRequest := client.Bulk()
for n, col := range rows {
n++
id := uuid.NewV4().String()
if n <= TOTAL_ROWS {
lat, err := strconv.ParseFloat(col[9], 64)
printError(err)
lon, err := strconv.ParseFloat(col[10], 64)
printError(err)
document := Document{
Ciudad: col[3],
Colonia: col[2],
Cp: col[1],
Delegacion: col[5],
Location: Location{
Lat: lat,
Lon: lon,
},
}
req := elastic.NewBulkIndexRequest().Index(os.Getenv("ELASTICSEARCH_INDEX")).Type(os.Getenv("ELASTICSEARCH_TYPE")).Id(id).Doc(document)
bulkRequest = bulkRequest.Add(req)
fmt.Printf("%v: %v\n", n, document)
}
}
bulkResponse, err := bulkRequest.Do(ctx)
printError(err)
indexed := bulkResponse.Indexed()
if len(indexed) != 1 {
fmt.Printf("\n Indexed documents: %v \n", len(indexed))
}
}
func printError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment