Last active
January 24, 2017 16:05
-
-
Save noerw/23d0e6275e31f2fc1c1ce14ed8ef79aa to your computer and use it in GitHub Desktop.
migration script in go/mgo for the openSenseMap. adding support for mobile boxes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
TODO: | |
- mongo performance optimieren | |
- in db migration framework einbetten: https://github.com/mattes/migrate | |
- filter measurements to update by date | |
IDEAS zur optimierung | |
- multithreaded. 4 statt 1? | |
-> 3 packt mein laptop, 50% schneller | |
- indexes entfernen? https://docs.mongodb.com/v3.2/core/write-performance/ -> besserer write | |
-> minimal effect (~3% schneller) | |
- pro sensor_id updaten, statt pro box -> besserer read | |
-> no improvement | |
- ..? | |
*/ | |
package main | |
import ( | |
"log" | |
"os" | |
"sync" | |
"time" | |
"gopkg.in/mgo.v2" | |
"gopkg.in/mgo.v2/bson" | |
) | |
type Sensor struct { | |
ID bson.ObjectId `bson:"_id"` | |
// rest is irrelevant. add here to populate on query | |
} | |
type OldBox struct { | |
ID bson.ObjectId `bson:"_id,omitempty"` | |
CreatedAt time.Time `bson:"createdAt"` | |
Name string `bson:"name,omitempty"` | |
Loc []map[string]interface{} `bson:"loc,omitempty"` | |
Sensors []Sensor `bson:"sensors,omitempty"` | |
} | |
func makeMongoConnString() (conn string) { | |
conn = "mongodb://" | |
user := os.Getenv("OSEM_dbuser") | |
pass := os.Getenv("OSEM_dbuserpass") | |
if user != "" { | |
conn += user | |
if pass != "" { | |
conn += ":" + pass | |
} | |
conn += "@" | |
} | |
conn += "localhost:27017" | |
conn += "/OSeM-api" | |
//conn += "?authSource=OSeM-api" | |
return | |
} | |
func updateBox(box *OldBox, mgoSession *mgo.Session, waitGroup *sync.WaitGroup, throttle chan int) (err error) { | |
defer waitGroup.Done() // decrease waitgroup counter | |
defer func() { <-throttle }() // decrease concurrency throttle counter | |
s := mgoSession.Copy() | |
defer s.Close() | |
log.Printf("processing box %s", box.ID) | |
boxes := s.DB("").C("boxes") | |
locations := s.DB("").C("locations") | |
measurements := s.DB("").C("measurements") | |
/* create location */ | |
newLocation := bson.M{ | |
"_id": bson.NewObjectId(), | |
"box": box.ID, | |
"timestamp": box.ID.Time(), | |
"loc": box.Loc[0]["geometry"], | |
} | |
err = locations.Insert(newLocation) | |
if err != nil { | |
log.Panicf("Couldn't insert location for %s: %v", box.ID, err) | |
} | |
/* update box */ | |
boxChange := bson.M{ | |
"$unset": bson.M{"type": "", "loc": ""}, | |
"$set": bson.M{"location": newLocation["_id"]}, | |
} | |
err = boxes.UpdateId(box.ID, boxChange) | |
if err != nil { | |
log.Panicf("Couldn't update box %s: %v", box.ID, err) | |
} | |
/* update measurements */ | |
sensors := make([]bson.ObjectId, 0, len(box.Sensors)) // contains sensorIDs | |
for _, sensor := range box.Sensors { | |
sensors = append(sensors, sensor.ID) | |
} | |
measureSelect := bson.M{"sensor_id": bson.M{"$in": sensors}} | |
measureUpdate := bson.M{"$set": bson.M{"location": newLocation["_id"]}} | |
_, err = measurements.UpdateAll(measureSelect, measureUpdate) | |
if err != nil { | |
log.Panicf("Couldn't update measurements for %s: %v", box.ID, err) | |
} | |
return | |
} | |
func main() { | |
log.SetFlags(log.Ltime | log.Lmicroseconds) | |
// establish session | |
connString := makeMongoConnString() | |
log.Printf("connecting to MongoDB at %s ...", connString) | |
session, err := mgo.DialWithTimeout(connString, 2*time.Minute) | |
if err != nil { | |
log.Panicf("Couldn't connect to MongoDB at %s: %v", connString, err) | |
} | |
defer session.Close() | |
session.SetMode(mgo.Monotonic, true) // ?? https://godoc.org/gopkg.in/mgo.v2#Mode | |
buildinfo, err := session.BuildInfo() | |
if err != nil { | |
panic(err) | |
} | |
if buildinfo.VersionAtLeast(3, 2) == false { | |
log.Fatal("MongoDB 3.2 or higher required!") | |
} | |
log.Print("connected.") | |
// Collections | |
boxes := session.DB("").C("boxes") | |
locations := session.DB("").C("locations") | |
measurements := session.DB("").C("measurements") | |
// get counts | |
numBoxes, err := boxes.Count() | |
numLocations, err := locations.Count() | |
numMeasurements, err := measurements.Count() | |
if err != nil { | |
log.Panicf("error getting collection counts: %v", err) | |
} | |
log.Printf("boxes: %v\n\tlocations: %v\n\tmeasurements: %v", | |
numBoxes, numLocations, numMeasurements) | |
// iterate over all boxes & process them concurrently | |
startTime := time.Now() | |
var wg sync.WaitGroup | |
const concurrency = 3 | |
var throttle = make(chan int, concurrency) | |
var boxList []OldBox | |
boxes.Find(nil).All(&boxList) | |
//boxes.Find(bson.M{"grouptag": "ifgi"}).All(&boxList) | |
for _, box := range boxList { | |
wg.Add(1) | |
throttle <- 1 | |
go updateBox(&box, session, &wg, throttle) | |
} | |
wg.Wait() | |
log.Printf("Done :^) required time: %s", | |
time.Since(startTime).String()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment