Created
March 14, 2019 13:37
-
-
Save joonas-fi/c56c0f6fa23edac436609af3b22a5c46 to your computer and use it in GitHub Desktop.
Storm perf issue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/asdine/storm" | |
"github.com/asdine/storm/codec/msgpack" | |
"os" | |
"encoding/json" | |
"io" | |
"fmt" | |
"bufio" | |
) | |
// run this with: $ ./stormexample < export.log | |
// export log is a file with 587900 lines (much smaller might demonstrate this as well) of | |
// data like this: | |
// | |
// {"Ref":"AAAtrqmSAynWdvl/C+VDH/qvYBjalOai6fSYX708iFw=","Volumes":[1],"VolumesPendingReplication":[2],"IsPendingReplication":true,"Referenced:true} | |
// {"Ref":"AAAyIKB9cG/Lkp5Ja1FGRMfETTX7RKOrse0xlX4+lQc=","Volumes":[1,2],"VolumesPendingReplication":[],"IsPendingReplication":false,"Referenced:true} | |
func main() { | |
if err := importDb(os.Stdin); err != nil { | |
panic(err) | |
} | |
} | |
type Blob struct { | |
Ref []byte `storm:"id"` | |
Volumes []int | |
VolumesPendingReplication []int | |
IsPendingReplication bool | |
Referenced bool | |
} | |
func importDbInternal(content io.Reader, withTx func(fn func(tx storm.Node) error) error) error { | |
scanner := bufio.NewScanner(content) | |
// by default craps out on lines > 64k. set max line to many megabytes | |
buf := make([]byte, 0, 8*1024*1024) | |
scanner.Buffer(buf, cap(buf)) | |
for scanner.Scan() { | |
line := scanner.Text() | |
if line == "" { | |
continue | |
} | |
// init empty record | |
record := &Blob{} | |
if err := json.Unmarshal([]byte(line), record); err != nil { | |
return err | |
} | |
if err := withTx(func(tx storm.Node) error { | |
return tx.Save(record) | |
}); err != nil { | |
return err | |
} | |
} | |
if err := scanner.Err(); err != nil { | |
return err | |
} | |
return nil | |
} | |
func importDb(content io.Reader) error { | |
db, err := storm.Open("stormexample.db", storm.Codec(msgpack.Codec)) | |
if err != nil { | |
return err | |
} | |
defer db.Close() | |
var openTx storm.Node | |
commitOpenTx := func() error { | |
if openTx == nil { | |
return nil | |
} | |
return openTx.Commit() | |
} | |
txUseCount := 0 | |
// automatically commits every N calls | |
withTx := func(fn func(tx storm.Node) error) error { | |
txUseCount++ | |
if (txUseCount % 1000) == 0 { | |
if err := commitOpenTx(); err != nil { | |
return err | |
} | |
openTx = nil | |
fmt.Printf(".") | |
} | |
if openTx == nil { | |
var errTxOpen error | |
openTx, errTxOpen = db.Begin(true) | |
if errTxOpen != nil { | |
return errTxOpen | |
} | |
} | |
return fn(openTx) | |
} | |
defer func() { | |
if openTx == nil { | |
return | |
} | |
if err := openTx.Rollback(); err != nil { | |
panic(fmt.Errorf("rollback failed: %v", err)) | |
} | |
}() | |
if err := importDbInternal(content, withTx); err != nil { | |
return err | |
} | |
return commitOpenTx() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment