Skip to content

Instantly share code, notes, and snippets.

@olivere
Created January 16, 2016 11:17
Show Gist options
  • Save olivere/a1dd52fc28cdfbbd6d4f to your computer and use it in GitHub Desktop.
Save olivere/a1dd52fc28cdfbbd6d4f to your computer and use it in GitHub Desktop.
Example #1 of bulk processor usage
// This is an example of using elastic's BulkProcessor with Elasticsearch.
//
// See https://github.com/olivere/elastic and
// and https://github.com/olivere/elastic/wiki/BulkProcessor
// for more details.
/*
* This example illustrates a simple process that performs bulk processing
* with Elasticsearch using the BulkProcessor in elastic.
*
* It sets up a Bulker that runs a loop that will send data into
* Elasticsearch. A second goroutine is started to periodically print
* statistics about the process, e.g. the number of successful/failed
* bulk commits.
*
* If you stop your cluster, the Bulker will find out as the After callback
* gets an error passed. As soon as that happens, Bulker stops sending
* new data into the BulkProcessor.
*
* When you start your cluster again, Bulker will also find out because it
* has set up an automatic flush interval. This flush will eventually succeed
* and the After callback will be called again, this time without an error.
* The Bulker picks that up and starts sending data again.
*
* You can pass several parameters to the process:
* The "url" parameter is the Elasticsearch HTTP endpoint (http://127.0.0.1:9200 by default).
* The "index" parameter allows you to specify the Elasticsearch index name ("bulker-test" by default).
* The "worker" parameter allows you to specify the number of parallel workers
* the BulkProcessor gets started with.
*
*/
package main
import (
"bytes"
"errors"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
elastic "gopkg.in/olivere/elastic.v3"
)
// Main entry point.
func main() {
var (
url = flag.String("url", "http://localhost:9200", "Elasticsearch URL.")
index = flag.String("index", "bulker-test", "Index name.")
workers = flag.Int("workers", 0, "Number of workers.")
)
flag.Parse()
if *url == "" {
log.Fatal("missing url")
}
if *index == "" {
log.Fatal("missing index name")
}
rand.Seed(time.Now().UnixNano())
client, err := elastic.NewClient(elastic.SetURL(*url))
if err != nil {
log.Fatal(err)
}
defer client.Stop()
errc := make(chan error)
// Run the bulker
b := &Bulker{c: client, workers: *workers, index: *index}
err = b.Run()
if err != nil {
log.Fatal(err)
}
defer b.Close()
// Run the statistics printer
go func(b *Bulker) {
for range time.Tick(1 * time.Second) {
printStats(b)
}
}(b)
// Watch for SIGINT and SIGTERM from the console.
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("caught signal %v\n", <-c)
errc <- nil
}()
// Wait for problems.
if err := <-errc; err != nil {
log.Print(err)
os.Exit(1)
}
}
// printStats retrieves statistics from the Bulker and prints them.
func printStats(b *Bulker) {
stats := b.Stats()
var buf bytes.Buffer
for i, w := range stats.Workers {
if i > 0 {
buf.WriteString(" ")
}
buf.WriteString(fmt.Sprintf("%d=[%04d]", i, w.Queued))
}
fmt.Printf("%s | calls B=%04d,A=%04d,S=%04d,F=%04d | stats I=%05d,S=%05d,F=%05d | %v\n",
time.Now().Format("15:04:05"),
b.beforeCalls,
b.afterCalls,
b.successCalls,
b.failureCalls,
stats.Indexed,
stats.Succeeded,
stats.Failed,
buf.String())
}
// -- Bulker --
// Bulker is an example for a process that needs to push data into
// Elasticsearch via BulkProcessor.
type Bulker struct {
c *elastic.Client
p *elastic.BulkProcessor
workers int
index string
beforeCalls int64 // # of calls into before callback
afterCalls int64 // # of calls into after callback
failureCalls int64 // # of successful calls into after callback
successCalls int64 // # of successful calls into after callback
seq int64 // sequential id
stopC chan struct{} // stop channel for the indexer
throttleMu sync.Mutex // guards the following block
throttle bool // throttle (or stop) sending data into bulk processor?
}
// Run starts the Bulker.
func (b *Bulker) Run() error {
// Recreate Elasticsearch index
if err := b.ensureIndex(); err != nil {
return err
}
// Start bulk processor
p, err := b.c.BulkProcessor().
Workers(b.workers). // # of workers
BulkActions(1000). // # of queued requests before committed
BulkSize(4096). // # of bytes in requests before committed
FlushInterval(10 * time.Second). // autocommit every 10 seconds
Stats(true). // gather statistics
Before(b.before). // call "before" before every commit
After(b.after). // call "after" after every commit
Do()
if err != nil {
return err
}
b.p = p
// Start indexer that pushes data into bulk processor
b.stopC = make(chan struct{})
go b.indexer()
return nil
}
// Close the bulker.
func (b *Bulker) Close() error {
b.stopC <- struct{}{}
<-b.stopC
close(b.stopC)
return nil
}
// indexer is a goroutine that periodically pushes data into
// bulk processor unless being "throttled" or "stopped".
func (b *Bulker) indexer() {
var stop bool
for !stop {
select {
case <-b.stopC:
stop = true
default:
b.throttleMu.Lock()
throttled := b.throttle
b.throttleMu.Unlock()
if !throttled {
// Sample data structure
doc := struct {
Seq int64 `json:"seq"`
}{
Seq: atomic.AddInt64(&b.seq, 1),
}
// Add bulk request.
// Notice that we need to set Index and Type here!
r := elastic.NewBulkIndexRequest().Index(b.index).Type("doc").Doc(doc)
b.p.Add(r)
}
// Sleep for a short time.
time.Sleep(time.Duration(rand.Intn(7)) * time.Millisecond)
}
}
b.stopC <- struct{}{} // ack stopping
}
// before is invoked from bulk processor before every commit.
func (b *Bulker) before(id int64, requests []elastic.BulkableRequest) {
atomic.AddInt64(&b.beforeCalls, 1)
}
// after is invoked by bulk processor after every commit.
// The err variable indicates success or failure.
func (b *Bulker) after(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
atomic.AddInt64(&b.afterCalls, 1)
b.throttleMu.Lock()
if err != nil {
atomic.AddInt64(&b.failureCalls, 1)
b.throttle = true // bulk processor in trouble
} else {
atomic.AddInt64(&b.successCalls, 1)
b.throttle = false // bulk processor ok
}
b.throttleMu.Unlock()
}
// Stats returns statistics from bulk processor.
func (b *Bulker) Stats() elastic.BulkProcessorStats {
return b.p.Stats()
}
// ensureIndex creates the index in Elasticsearch.
// It will be dropped if it already exists.
func (b *Bulker) ensureIndex() error {
if b.index == "" {
return errors.New("no index name")
}
exists, err := b.c.IndexExists(b.index).Do()
if err != nil {
return err
}
if exists {
_, err = b.c.DeleteIndex(b.index).Do()
if err != nil {
return err
}
}
_, err = b.c.CreateIndex(b.index).Do()
if err != nil {
return err
}
return nil
}
@valer-cara
Copy link

@olivere - What's the purpose of the short sleep in indexer() ?

@vancexu
Copy link

vancexu commented Dec 11, 2018

@olivere - What's the purpose of the short sleep in indexer() ?

I think it's just a fake throttle to avoid inserting too many test data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment