Created
March 2, 2012 11:29
-
-
Save kortschak/1957898 to your computer and use it in GitHub Desktop.
morass
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Use morass when you don't want your data to be a quagmire. | |
// | |
//Sort data larger than can fit in memory. | |
// | |
// morass məˈras/ | |
// 1. An area of muddy or boggy ground. | |
// 2. A complicated or confused situation. | |
package morass | |
// Copyright ©2011 Dan Kortschak <[email protected]> | |
// | |
// This program is free software: you can redistribute it and/or modify | |
// it under the terms of the GNU General Public License as published by | |
// the Free Software Foundation, either version 3 of the License, or | |
// (at your option) any later version. | |
// | |
// This program is distributed in the hope that it will be useful, | |
// but WITHOUT ANY WARRANTY; without even the implied warranty of | |
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
// GNU General Public License for more details. | |
// | |
// You should have received a copy of the GNU General Public License | |
// along with this program. If not, see <http://www.gnu.org/licenses/>. | |
import ( | |
"container/heap" | |
"encoding/gob" | |
"errors" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"os" | |
"reflect" | |
"runtime" | |
"sort" | |
"sync" | |
) | |
var ( | |
m = &sync.Mutex{} | |
registered = make(map[reflect.Type]struct{}) | |
nextID = 0 | |
) | |
func register(e interface{}, t reflect.Type) { | |
m.Lock() | |
defer m.Unlock() | |
defer func() { | |
recover() // The only panic that we can get is from trying to register a base type. | |
registered[t] = struct{}{} // Remember for next time. | |
}() | |
if _, ok := registered[t]; !ok { | |
registered[t] = struct{}{} | |
gob.RegisterName(fmt.Sprintf("ℳ%d", nextID), e) | |
nextID++ | |
} | |
} | |
// Is the receiver less than the parameterised interface | |
type LessInterface interface { | |
Less(i interface{}) bool | |
} | |
type sortable []LessInterface | |
func (self sortable) Len() int { return len(self) } | |
func (self sortable) Less(i, j int) bool { return self[i].Less(self[j]) } | |
func (self *sortable) Swap(i, j int) { (*self)[i], (*self)[j] = (*self)[j], (*self)[i] } | |
type file struct { | |
head LessInterface | |
file *os.File | |
encoder *gob.Encoder | |
decoder *gob.Decoder | |
} | |
type files []*file | |
func (self files) Len() int { return len(self) } | |
func (self files) Less(i, j int) bool { return self[i].head.Less(self[j].head) } | |
func (self *files) Swap(i, j int) { (*self)[i], (*self)[j] = (*self)[j], (*self)[i] } | |
func (self *files) Pop() (i interface{}) { | |
i = (*self)[len(*self)-1] | |
*self = (*self)[:len(*self)-1] | |
return | |
} | |
func (self *files) Push(x interface{}) { *self = append(*self, x.(*file)) } | |
// Type to manage sorting very large data sets. | |
// Setting AutoClean to true causes the Morass to delete temporary sort files | |
// when they are depleted. | |
type Morass struct { | |
mutex sync.Mutex | |
t reflect.Type | |
pos, length int64 | |
chunk sortable | |
done chan sortable | |
err chan error | |
prefix string | |
dir string | |
files files | |
finalised bool | |
fast bool | |
AutoClean bool | |
} | |
// Create a new Morass. prefix and dir are passed to ioutil.TempDir. chunkSize specifies | |
// the amount of sorting to be done in memory, concurrent specifies that temporary file | |
// writing occurs concurrently with sorting. | |
// An error is returned if no temporary directory can be created. | |
func New(e interface{}, prefix, dir string, chunkSize int, concurrent bool) (*Morass, error) { | |
d, err := ioutil.TempDir(dir, prefix) | |
if err != nil { | |
return nil, err | |
} | |
m := &Morass{ | |
prefix: prefix, | |
dir: d, | |
done: make(chan sortable, 1), | |
files: make(files, 0), | |
err: make(chan error, 1), | |
} | |
m.t = reflect.TypeOf(e) | |
register(e, m.t) | |
m.chunk = make(sortable, 0, chunkSize) | |
if concurrent { | |
m.done <- make(sortable, 0) | |
} | |
f := func(self *Morass) { | |
if self.AutoClean { | |
self.CleanUp() | |
} | |
} | |
runtime.SetFinalizer(m, f) | |
return m, nil | |
} | |
// Push a value on to the Morass. Note that the underlying gob encoder is given the | |
// name ℳ when the type registered to avoid using too much space. | |
// Returns any error that occurs. | |
func (self *Morass) Push(e LessInterface) (err error) { | |
if t := reflect.TypeOf(e); t != self.t { | |
return errors.New(fmt.Sprintf("Type mismatch: %s != %s", t, self.t)) | |
} | |
self.mutex.Lock() | |
defer self.mutex.Unlock() | |
select { | |
case err = <-self.err: | |
if err != nil { | |
return | |
} | |
default: | |
} | |
if self.finalised { | |
return errors.New("Push on finalised morass") | |
} | |
if c := cap(self.chunk); len(self.chunk) == c { | |
go self.write(self.chunk) | |
self.chunk = <-self.done | |
if cap(self.chunk) == 0 { | |
self.chunk = make(sortable, 0, c) | |
} | |
} | |
self.chunk = append(self.chunk, e) | |
self.pos++ | |
self.length++ | |
return | |
} | |
func (self *Morass) write(writing sortable) (err error) { | |
defer func() { | |
self.err <- err | |
self.done <- writing[:0] | |
}() | |
select { | |
case <-self.err: | |
default: | |
} | |
sort.Sort(&writing) | |
var tf *os.File | |
if tf, err = ioutil.TempFile(self.dir, self.prefix); err != nil { | |
return | |
} | |
enc := gob.NewEncoder(tf) | |
dec := gob.NewDecoder(tf) | |
f := &file{head: nil, file: tf, encoder: enc, decoder: dec} | |
self.files = append(self.files, f) | |
for _, e := range writing { | |
if err = enc.Encode(&e); err != nil { | |
return | |
} | |
} | |
err = tf.Sync() | |
return | |
} | |
// Return the corrent position of the cursor in the Morass. | |
func (self *Morass) Pos() int64 { return self.pos } | |
// Return the corrent length of the Morass. | |
func (self *Morass) Len() int64 { return self.length } | |
// Indicate that the last element has been pushed on to the Morass and write out final data. | |
// Returns any error that occurs. | |
func (self *Morass) Finalise() (err error) { | |
self.mutex.Lock() | |
defer self.mutex.Unlock() | |
select { | |
case err = <-self.err: | |
if err != nil { | |
return | |
} | |
default: | |
} | |
if !self.finalised { | |
if self.pos < int64(cap(self.chunk)) { | |
self.fast = true | |
sort.Sort(&self.chunk) | |
} else { | |
if len(self.chunk) > 0 { | |
go self.write(self.chunk) | |
err = <-self.err | |
} | |
} | |
self.pos = 0 | |
self.finalised = true | |
} else { | |
return nil | |
} | |
if !self.fast { | |
for _, f := range self.files { | |
if _, err = f.file.Seek(0, 0); err != nil { | |
return | |
} | |
if err = f.decoder.Decode(&f.head); err != nil && err != io.EOF { | |
return | |
} | |
} | |
heap.Init(&self.files) | |
} | |
return nil | |
} | |
// Reset the Morass to an empty state. | |
// Returns any error that occurs. | |
func (self *Morass) Clear() (err error) { | |
self.mutex.Lock() | |
defer self.mutex.Unlock() | |
for _, f := range self.files { | |
if err = f.file.Close(); err != nil { | |
return | |
} | |
if err = os.Remove(f.file.Name()); err != nil { | |
return | |
} | |
} | |
self.files = self.files[:0] | |
self.pos = 0 | |
self.length = 0 | |
self.finalised = false | |
self.chunk = self.chunk[:0] | |
return | |
} | |
// Delete the file system components of the Morass. After this call the Morass is not usable. | |
// Returns any error that occurs. | |
func (self *Morass) CleanUp() (err error) { | |
self.mutex.Lock() | |
defer self.mutex.Unlock() | |
return os.RemoveAll(self.dir) | |
} | |
// Set the settable value e to the lowest value in the Morass. | |
// io.EOF indicate the Morass is empty. Any other error results in no value being set on e. | |
func (self *Morass) Pull(e LessInterface) (err error) { | |
self.mutex.Lock() | |
defer self.mutex.Unlock() | |
if len(self.chunk) == 0 { | |
return io.EOF | |
} | |
v := reflect.ValueOf(e) | |
if !reflect.Indirect(v).CanSet() { | |
return errors.New("morass: Cannot set e") | |
} | |
if self.fast { | |
if self.pos < int64(len(self.chunk)) { | |
e = self.chunk[self.pos].(LessInterface) | |
self.pos++ | |
err = nil | |
} else { | |
err = io.EOF | |
} | |
} else { | |
if self.files.Len() > 0 { | |
low := heap.Pop(&self.files).(*file) | |
e = low.head | |
self.pos++ | |
switch err = low.decoder.Decode(&low.head); err { | |
case nil: | |
heap.Push(&self.files, low) | |
case io.EOF: | |
err = nil | |
fallthrough | |
default: | |
low.file.Close() | |
if self.AutoClean { | |
os.Remove(low.file.Name()) | |
} | |
} | |
} else { | |
if self.AutoClean { | |
os.RemoveAll(self.dir) | |
} | |
err = io.EOF | |
} | |
} | |
if err != io.EOF { | |
reflect.Indirect(v).Set(reflect.ValueOf(e)) | |
} | |
return | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package morass | |
// Copyright ©2011 Dan Kortschak <[email protected]> | |
// | |
// This program is free software: you can redistribute it and/or modify | |
// it under the terms of the GNU General Public License as published by | |
// the Free Software Foundation, either version 3 of the License, or | |
// (at your option) any later version. | |
// | |
// This program is distributed in the hope that it will be useful, | |
// but WITHOUT ANY WARRANTY; without even the implied warranty of | |
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
// GNU General Public License for more details. | |
// | |
// You should have received a copy of the GNU General Public License | |
// along with this program. If not, see <http://www.gnu.org/licenses/>. | |
import ( | |
"io" | |
check "launchpad.net/gocheck" | |
"math/rand" | |
"testing" | |
) | |
const minInt = -int(^uint(0)>>1) - 1 | |
// Tests | |
func Test(t *testing.T) { check.TestingT(t) } | |
type S struct{} | |
const ( | |
chunk = 100 | |
testLen = 10000 | |
) | |
var _ = check.Suite(&S{}) | |
type intLesser int | |
func (i intLesser) Less(j interface{}) bool { return i < j.(intLesser) } | |
type structLesser struct { | |
A int | |
B int | |
} | |
func (i structLesser) Less(j interface{}) bool { return i.A < j.(structLesser).A } | |
func (s *S) TestMorass(c *check.C) { | |
for _, concurrent := range []bool{false, true} { | |
if m, err := New(intLesser(0), "", "", chunk, concurrent); err != nil { | |
c.Fatalf("New Morass failed: %v", err) | |
} else { | |
for i := 0; i < testLen; i++ { | |
if err = m.Push(intLesser(rand.Int())); err != nil { | |
c.Fatalf("Push %d failed: %v", i, err) | |
c.Check(int64(i), check.Equals, m.Pos()) | |
} | |
} | |
if err = m.Finalise(); err != nil { | |
c.Fatalf("Finalise failed: %v", err) | |
} | |
c.Check(m.Len(), check.Equals, int64(testLen)) | |
L: | |
for i := 0; i <= testLen; i++ { | |
var v intLesser | |
lv := intLesser(minInt) | |
c.Check(int64(i), check.Equals, m.Pos()) | |
switch err = m.Pull(&v); err { | |
case nil: | |
c.Check(v.Less(lv), check.Equals, false) | |
case io.EOF: | |
break L | |
default: | |
c.Fatalf("Pull failed: %v", err) | |
} | |
} | |
if err = m.CleanUp(); err != nil { | |
c.Fatalf("CleanUp failed: %v", err) | |
} | |
} | |
} | |
} | |
func (s *S) TestReuse1(c *check.C) { | |
if m, err := New(intLesser(0), "", "", chunk, false); err != nil { | |
c.Fatalf("New Morass failed: %v", err) | |
} else { | |
for r := 0; r < 2; r++ { | |
for i := 0; i < testLen; i++ { | |
if err = m.Push(intLesser(rand.Int())); err != nil { | |
c.Fatalf("Push %d failed on repeat %d : %v", i, r, err) | |
c.Check(int64(i), check.Equals, m.Pos()) | |
} | |
} | |
if err = m.Finalise(); err != nil { | |
c.Fatalf("Finalise failed on repeat %d: %v", r, err) | |
} | |
c.Check(m.Len(), check.Equals, int64(testLen)) | |
L: | |
for i := 0; i <= testLen; i++ { | |
var v intLesser | |
lv := intLesser(minInt) | |
c.Check(int64(i), check.Equals, m.Pos()) | |
switch err = m.Pull(&v); err { | |
case nil: | |
c.Check(v.Less(lv), check.Equals, false) | |
case io.EOF: | |
break L | |
default: | |
c.Fatalf("Pull failed on repeat %d: %v", r, err) | |
} | |
} | |
if err = m.Clear(); err != nil { | |
c.Fatalf("Clear failed on repeat %d: %v", r, err) | |
} | |
} | |
if err = m.CleanUp(); err != nil { | |
c.Fatalf("CleanUp failed: %v", err) | |
} | |
} | |
} | |
func (s *S) TestReuser2(c *check.C) { | |
if m, err := New(structLesser{}, "", "", chunk, false); err != nil { | |
c.Fatalf("New Morass failed: %v", err) | |
} else { | |
for r := 0; r < 2; r++ { | |
for i := 0; i < testLen; i++ { | |
if err = m.Push(structLesser{rand.Int(), rand.Int()}); err != nil { | |
c.Fatalf("Push %d failed on repeat %d : %v", i, r, err) | |
c.Check(int64(i), check.Equals, m.Pos()) | |
} | |
} | |
if err = m.Finalise(); err != nil { | |
c.Fatalf("Finalise failed on repeat %d: %v", r, err) | |
} | |
c.Check(m.Len(), check.Equals, int64(testLen)) | |
L: | |
for i := 0; i <= testLen; i++ { | |
var v structLesser | |
lv := structLesser{minInt, 0} | |
c.Check(int64(i), check.Equals, m.Pos()) | |
switch err = m.Pull(&v); err { | |
case nil: | |
c.Check(v.Less(lv), check.Equals, false) | |
case io.EOF: | |
break L | |
default: | |
c.Fatalf("Pull failed on repeat %d: %v", r, err) | |
} | |
} | |
if err = m.Clear(); err != nil { | |
c.Fatalf("Clear failed on repeat %d: %v", r, err) | |
} | |
} | |
if err = m.CleanUp(); err != nil { | |
c.Fatalf("CleanUp failed: %v", err) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment