Skip to content

Instantly share code, notes, and snippets.

@noerw
Last active January 24, 2017 16:05
Show Gist options
  • Save noerw/23d0e6275e31f2fc1c1ce14ed8ef79aa to your computer and use it in GitHub Desktop.
Save noerw/23d0e6275e31f2fc1c1ce14ed8ef79aa to your computer and use it in GitHub Desktop.
migration script in go/mgo for the openSenseMap. adding support for mobile boxes
/*
TODO:
- mongo performance optimieren
- in db migration framework einbetten: https://github.com/mattes/migrate
- filter measurements to update by date
IDEAS zur optimierung
- multithreaded. 4 statt 1?
-> 3 packt mein laptop, 50% schneller
- indexes entfernen? https://docs.mongodb.com/v3.2/core/write-performance/ -> besserer write
-> minimal effect (~3% schneller)
- pro sensor_id updaten, statt pro box -> besserer read
-> no improvement
- ..?
*/
package main
import (
"log"
"os"
"sync"
"time"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type Sensor struct {
ID bson.ObjectId `bson:"_id"`
// rest is irrelevant. add here to populate on query
}
type OldBox struct {
ID bson.ObjectId `bson:"_id,omitempty"`
CreatedAt time.Time `bson:"createdAt"`
Name string `bson:"name,omitempty"`
Loc []map[string]interface{} `bson:"loc,omitempty"`
Sensors []Sensor `bson:"sensors,omitempty"`
}
func makeMongoConnString() (conn string) {
conn = "mongodb://"
user := os.Getenv("OSEM_dbuser")
pass := os.Getenv("OSEM_dbuserpass")
if user != "" {
conn += user
if pass != "" {
conn += ":" + pass
}
conn += "@"
}
conn += "localhost:27017"
conn += "/OSeM-api"
//conn += "?authSource=OSeM-api"
return
}
func updateBox(box *OldBox, mgoSession *mgo.Session, waitGroup *sync.WaitGroup, throttle chan int) (err error) {
defer waitGroup.Done() // decrease waitgroup counter
defer func() { <-throttle }() // decrease concurrency throttle counter
s := mgoSession.Copy()
defer s.Close()
log.Printf("processing box %s", box.ID)
boxes := s.DB("").C("boxes")
locations := s.DB("").C("locations")
measurements := s.DB("").C("measurements")
/* create location */
newLocation := bson.M{
"_id": bson.NewObjectId(),
"box": box.ID,
"timestamp": box.ID.Time(),
"loc": box.Loc[0]["geometry"],
}
err = locations.Insert(newLocation)
if err != nil {
log.Panicf("Couldn't insert location for %s: %v", box.ID, err)
}
/* update box */
boxChange := bson.M{
"$unset": bson.M{"type": "", "loc": ""},
"$set": bson.M{"location": newLocation["_id"]},
}
err = boxes.UpdateId(box.ID, boxChange)
if err != nil {
log.Panicf("Couldn't update box %s: %v", box.ID, err)
}
/* update measurements */
sensors := make([]bson.ObjectId, 0, len(box.Sensors)) // contains sensorIDs
for _, sensor := range box.Sensors {
sensors = append(sensors, sensor.ID)
}
measureSelect := bson.M{"sensor_id": bson.M{"$in": sensors}}
measureUpdate := bson.M{"$set": bson.M{"location": newLocation["_id"]}}
_, err = measurements.UpdateAll(measureSelect, measureUpdate)
if err != nil {
log.Panicf("Couldn't update measurements for %s: %v", box.ID, err)
}
return
}
func main() {
log.SetFlags(log.Ltime | log.Lmicroseconds)
// establish session
connString := makeMongoConnString()
log.Printf("connecting to MongoDB at %s ...", connString)
session, err := mgo.DialWithTimeout(connString, 2*time.Minute)
if err != nil {
log.Panicf("Couldn't connect to MongoDB at %s: %v", connString, err)
}
defer session.Close()
session.SetMode(mgo.Monotonic, true) // ?? https://godoc.org/gopkg.in/mgo.v2#Mode
buildinfo, err := session.BuildInfo()
if err != nil {
panic(err)
}
if buildinfo.VersionAtLeast(3, 2) == false {
log.Fatal("MongoDB 3.2 or higher required!")
}
log.Print("connected.")
// Collections
boxes := session.DB("").C("boxes")
locations := session.DB("").C("locations")
measurements := session.DB("").C("measurements")
// get counts
numBoxes, err := boxes.Count()
numLocations, err := locations.Count()
numMeasurements, err := measurements.Count()
if err != nil {
log.Panicf("error getting collection counts: %v", err)
}
log.Printf("boxes: %v\n\tlocations: %v\n\tmeasurements: %v",
numBoxes, numLocations, numMeasurements)
// iterate over all boxes & process them concurrently
startTime := time.Now()
var wg sync.WaitGroup
const concurrency = 3
var throttle = make(chan int, concurrency)
var boxList []OldBox
boxes.Find(nil).All(&boxList)
//boxes.Find(bson.M{"grouptag": "ifgi"}).All(&boxList)
for _, box := range boxList {
wg.Add(1)
throttle <- 1
go updateBox(&box, session, &wg, throttle)
}
wg.Wait()
log.Printf("Done :^) required time: %s",
time.Since(startTime).String())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment