Skip to content

Instantly share code, notes, and snippets.

@kortschak
Created March 2, 2012 11:29
Show Gist options
  • Save kortschak/1957898 to your computer and use it in GitHub Desktop.
Save kortschak/1957898 to your computer and use it in GitHub Desktop.
morass
//Use morass when you don't want your data to be a quagmire.
//
//Sort data larger than can fit in memory.
//
// morass məˈras/
// 1. An area of muddy or boggy ground.
// 2. A complicated or confused situation.
package morass
// Copyright ©2011 Dan Kortschak <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
import (
"container/heap"
"encoding/gob"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"reflect"
"runtime"
"sort"
"sync"
)
var (
m = &sync.Mutex{}
registered = make(map[reflect.Type]struct{})
nextID = 0
)
func register(e interface{}, t reflect.Type) {
m.Lock()
defer m.Unlock()
defer func() {
recover() // The only panic that we can get is from trying to register a base type.
registered[t] = struct{}{} // Remember for next time.
}()
if _, ok := registered[t]; !ok {
registered[t] = struct{}{}
gob.RegisterName(fmt.Sprintf("ℳ%d", nextID), e)
nextID++
}
}
// Is the receiver less than the parameterised interface
type LessInterface interface {
Less(i interface{}) bool
}
type sortable []LessInterface
func (self sortable) Len() int { return len(self) }
func (self sortable) Less(i, j int) bool { return self[i].Less(self[j]) }
func (self *sortable) Swap(i, j int) { (*self)[i], (*self)[j] = (*self)[j], (*self)[i] }
type file struct {
head LessInterface
file *os.File
encoder *gob.Encoder
decoder *gob.Decoder
}
type files []*file
func (self files) Len() int { return len(self) }
func (self files) Less(i, j int) bool { return self[i].head.Less(self[j].head) }
func (self *files) Swap(i, j int) { (*self)[i], (*self)[j] = (*self)[j], (*self)[i] }
func (self *files) Pop() (i interface{}) {
i = (*self)[len(*self)-1]
*self = (*self)[:len(*self)-1]
return
}
func (self *files) Push(x interface{}) { *self = append(*self, x.(*file)) }
// Type to manage sorting very large data sets.
// Setting AutoClean to true causes the Morass to delete temporary sort files
// when they are depleted.
type Morass struct {
mutex sync.Mutex
t reflect.Type
pos, length int64
chunk sortable
done chan sortable
err chan error
prefix string
dir string
files files
finalised bool
fast bool
AutoClean bool
}
// Create a new Morass. prefix and dir are passed to ioutil.TempDir. chunkSize specifies
// the amount of sorting to be done in memory, concurrent specifies that temporary file
// writing occurs concurrently with sorting.
// An error is returned if no temporary directory can be created.
func New(e interface{}, prefix, dir string, chunkSize int, concurrent bool) (*Morass, error) {
d, err := ioutil.TempDir(dir, prefix)
if err != nil {
return nil, err
}
m := &Morass{
prefix: prefix,
dir: d,
done: make(chan sortable, 1),
files: make(files, 0),
err: make(chan error, 1),
}
m.t = reflect.TypeOf(e)
register(e, m.t)
m.chunk = make(sortable, 0, chunkSize)
if concurrent {
m.done <- make(sortable, 0)
}
f := func(self *Morass) {
if self.AutoClean {
self.CleanUp()
}
}
runtime.SetFinalizer(m, f)
return m, nil
}
// Push a value on to the Morass. Note that the underlying gob encoder is given the
// name ℳ when the type registered to avoid using too much space.
// Returns any error that occurs.
func (self *Morass) Push(e LessInterface) (err error) {
if t := reflect.TypeOf(e); t != self.t {
return errors.New(fmt.Sprintf("Type mismatch: %s != %s", t, self.t))
}
self.mutex.Lock()
defer self.mutex.Unlock()
select {
case err = <-self.err:
if err != nil {
return
}
default:
}
if self.finalised {
return errors.New("Push on finalised morass")
}
if c := cap(self.chunk); len(self.chunk) == c {
go self.write(self.chunk)
self.chunk = <-self.done
if cap(self.chunk) == 0 {
self.chunk = make(sortable, 0, c)
}
}
self.chunk = append(self.chunk, e)
self.pos++
self.length++
return
}
func (self *Morass) write(writing sortable) (err error) {
defer func() {
self.err <- err
self.done <- writing[:0]
}()
select {
case <-self.err:
default:
}
sort.Sort(&writing)
var tf *os.File
if tf, err = ioutil.TempFile(self.dir, self.prefix); err != nil {
return
}
enc := gob.NewEncoder(tf)
dec := gob.NewDecoder(tf)
f := &file{head: nil, file: tf, encoder: enc, decoder: dec}
self.files = append(self.files, f)
for _, e := range writing {
if err = enc.Encode(&e); err != nil {
return
}
}
err = tf.Sync()
return
}
// Return the corrent position of the cursor in the Morass.
func (self *Morass) Pos() int64 { return self.pos }
// Return the corrent length of the Morass.
func (self *Morass) Len() int64 { return self.length }
// Indicate that the last element has been pushed on to the Morass and write out final data.
// Returns any error that occurs.
func (self *Morass) Finalise() (err error) {
self.mutex.Lock()
defer self.mutex.Unlock()
select {
case err = <-self.err:
if err != nil {
return
}
default:
}
if !self.finalised {
if self.pos < int64(cap(self.chunk)) {
self.fast = true
sort.Sort(&self.chunk)
} else {
if len(self.chunk) > 0 {
go self.write(self.chunk)
err = <-self.err
}
}
self.pos = 0
self.finalised = true
} else {
return nil
}
if !self.fast {
for _, f := range self.files {
if _, err = f.file.Seek(0, 0); err != nil {
return
}
if err = f.decoder.Decode(&f.head); err != nil && err != io.EOF {
return
}
}
heap.Init(&self.files)
}
return nil
}
// Reset the Morass to an empty state.
// Returns any error that occurs.
func (self *Morass) Clear() (err error) {
self.mutex.Lock()
defer self.mutex.Unlock()
for _, f := range self.files {
if err = f.file.Close(); err != nil {
return
}
if err = os.Remove(f.file.Name()); err != nil {
return
}
}
self.files = self.files[:0]
self.pos = 0
self.length = 0
self.finalised = false
self.chunk = self.chunk[:0]
return
}
// Delete the file system components of the Morass. After this call the Morass is not usable.
// Returns any error that occurs.
func (self *Morass) CleanUp() (err error) {
self.mutex.Lock()
defer self.mutex.Unlock()
return os.RemoveAll(self.dir)
}
// Set the settable value e to the lowest value in the Morass.
// io.EOF indicate the Morass is empty. Any other error results in no value being set on e.
func (self *Morass) Pull(e LessInterface) (err error) {
self.mutex.Lock()
defer self.mutex.Unlock()
if len(self.chunk) == 0 {
return io.EOF
}
v := reflect.ValueOf(e)
if !reflect.Indirect(v).CanSet() {
return errors.New("morass: Cannot set e")
}
if self.fast {
if self.pos < int64(len(self.chunk)) {
e = self.chunk[self.pos].(LessInterface)
self.pos++
err = nil
} else {
err = io.EOF
}
} else {
if self.files.Len() > 0 {
low := heap.Pop(&self.files).(*file)
e = low.head
self.pos++
switch err = low.decoder.Decode(&low.head); err {
case nil:
heap.Push(&self.files, low)
case io.EOF:
err = nil
fallthrough
default:
low.file.Close()
if self.AutoClean {
os.Remove(low.file.Name())
}
}
} else {
if self.AutoClean {
os.RemoveAll(self.dir)
}
err = io.EOF
}
}
if err != io.EOF {
reflect.Indirect(v).Set(reflect.ValueOf(e))
}
return
}
package morass
// Copyright ©2011 Dan Kortschak <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
import (
"io"
check "launchpad.net/gocheck"
"math/rand"
"testing"
)
const minInt = -int(^uint(0)>>1) - 1
// Tests
func Test(t *testing.T) { check.TestingT(t) }
type S struct{}
const (
chunk = 100
testLen = 10000
)
var _ = check.Suite(&S{})
type intLesser int
func (i intLesser) Less(j interface{}) bool { return i < j.(intLesser) }
type structLesser struct {
A int
B int
}
func (i structLesser) Less(j interface{}) bool { return i.A < j.(structLesser).A }
func (s *S) TestMorass(c *check.C) {
for _, concurrent := range []bool{false, true} {
if m, err := New(intLesser(0), "", "", chunk, concurrent); err != nil {
c.Fatalf("New Morass failed: %v", err)
} else {
for i := 0; i < testLen; i++ {
if err = m.Push(intLesser(rand.Int())); err != nil {
c.Fatalf("Push %d failed: %v", i, err)
c.Check(int64(i), check.Equals, m.Pos())
}
}
if err = m.Finalise(); err != nil {
c.Fatalf("Finalise failed: %v", err)
}
c.Check(m.Len(), check.Equals, int64(testLen))
L:
for i := 0; i <= testLen; i++ {
var v intLesser
lv := intLesser(minInt)
c.Check(int64(i), check.Equals, m.Pos())
switch err = m.Pull(&v); err {
case nil:
c.Check(v.Less(lv), check.Equals, false)
case io.EOF:
break L
default:
c.Fatalf("Pull failed: %v", err)
}
}
if err = m.CleanUp(); err != nil {
c.Fatalf("CleanUp failed: %v", err)
}
}
}
}
func (s *S) TestReuse1(c *check.C) {
if m, err := New(intLesser(0), "", "", chunk, false); err != nil {
c.Fatalf("New Morass failed: %v", err)
} else {
for r := 0; r < 2; r++ {
for i := 0; i < testLen; i++ {
if err = m.Push(intLesser(rand.Int())); err != nil {
c.Fatalf("Push %d failed on repeat %d : %v", i, r, err)
c.Check(int64(i), check.Equals, m.Pos())
}
}
if err = m.Finalise(); err != nil {
c.Fatalf("Finalise failed on repeat %d: %v", r, err)
}
c.Check(m.Len(), check.Equals, int64(testLen))
L:
for i := 0; i <= testLen; i++ {
var v intLesser
lv := intLesser(minInt)
c.Check(int64(i), check.Equals, m.Pos())
switch err = m.Pull(&v); err {
case nil:
c.Check(v.Less(lv), check.Equals, false)
case io.EOF:
break L
default:
c.Fatalf("Pull failed on repeat %d: %v", r, err)
}
}
if err = m.Clear(); err != nil {
c.Fatalf("Clear failed on repeat %d: %v", r, err)
}
}
if err = m.CleanUp(); err != nil {
c.Fatalf("CleanUp failed: %v", err)
}
}
}
func (s *S) TestReuser2(c *check.C) {
if m, err := New(structLesser{}, "", "", chunk, false); err != nil {
c.Fatalf("New Morass failed: %v", err)
} else {
for r := 0; r < 2; r++ {
for i := 0; i < testLen; i++ {
if err = m.Push(structLesser{rand.Int(), rand.Int()}); err != nil {
c.Fatalf("Push %d failed on repeat %d : %v", i, r, err)
c.Check(int64(i), check.Equals, m.Pos())
}
}
if err = m.Finalise(); err != nil {
c.Fatalf("Finalise failed on repeat %d: %v", r, err)
}
c.Check(m.Len(), check.Equals, int64(testLen))
L:
for i := 0; i <= testLen; i++ {
var v structLesser
lv := structLesser{minInt, 0}
c.Check(int64(i), check.Equals, m.Pos())
switch err = m.Pull(&v); err {
case nil:
c.Check(v.Less(lv), check.Equals, false)
case io.EOF:
break L
default:
c.Fatalf("Pull failed on repeat %d: %v", r, err)
}
}
if err = m.Clear(); err != nil {
c.Fatalf("Clear failed on repeat %d: %v", r, err)
}
}
if err = m.CleanUp(); err != nil {
c.Fatalf("CleanUp failed: %v", err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment