Created
January 17, 2018 15:59
-
-
Save johncoder/25f42c1b1abdd16bedc6085602ecb678 to your computer and use it in GitHub Desktop.
event store in go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"encoding/binary" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"strconv" | |
"strings" | |
"time" | |
"unsafe" | |
) | |
// TODO(john): Compression | |
// TODO(john): Encryption | |
// TODO(john): Content-types? | |
// TODO(john): Network interface | |
// TODO(john): server design ideas: 1) use a goroutine to synchronize writes to a stream, ensuring optimistic concurrency; 2) use a single goroutine to achieve a single writer, fanout/fanin | |
// TODO(john): figure out how to properly update Next pointer of an atom | |
// NOTE(john): This approach will probably require some kind of cache or "head" file to optimize startup time. It'll contain a byte position reference to the latest event in a particular stream. This can utilize the same format as the normal log file. | |
// NOTE(john): Sharding seems doable, based on stream Id? | |
// NOTE(john): Obscure thought, but what if there were three files: epochs, events, heads? | |
type ( | |
Atom struct { | |
Type uint8 | |
StreamId int64 | |
DateTime [len(EventDateTimeFormat)]uint8 | |
Length int64 | |
Next int64 | |
SequenceId int64 | |
EventType uint16 | |
ContentType uint8 | |
Value []byte | |
} | |
AtomMarker struct { | |
StreamId int64 | |
SequenceId int64 | |
Next *AtomMarker | |
Prev *AtomMarker | |
NextOrthogonal *AtomMarker | |
PrevOrthogonal *AtomMarker | |
Value *Atom | |
} | |
WriteAtomResult struct { | |
Success bool | |
Fail bool | |
SequenceId int64 | |
} | |
WriteAtom struct { | |
Reply chan WriteAtomResult | |
Atom Atom | |
} | |
AtomResult struct { | |
DateTime string | |
StreamId int64 | |
SequenceId int64 | |
EventType uint16 | |
ContentType string | |
Value string | |
} | |
) | |
// TODO(john): Tighten up the idea of atom types (lists of atoms?) | |
const ( | |
AtomType int8 = iota | |
ListType = iota | |
LiteralType = iota | |
EventDateTimeFormat = "2006-01-02T03:04:05Z" // based on time.RFC3339 | |
) | |
var ( | |
tail bool | |
follow bool | |
port string | |
logFileName string | |
AtomHeaderByteLength int | |
LastAtom *AtomMarker | |
Streams = make(map[int64]*AtomMarker) | |
Endianness = binary.LittleEndian | |
ContentTypes = map[string]uint8{ | |
"binary": 0, | |
"text/plain": 1, | |
"application/json": 2, | |
"application/xml": 3, | |
} | |
ContentTypesById = map[uint8]string{ | |
0: "binary", | |
1: "text/plain", | |
2: "application/json", | |
3: "application/xml", | |
} | |
) | |
func sumSizes(nums ...int) int { | |
result := 0 | |
for _, v := range nums { | |
result += v | |
} | |
return result | |
} | |
// TODO(john): add short versions of each option | |
// TODO(john): consider using a configuration file | |
func init() { | |
flag.BoolVar(&tail, "t", false, "Read the file and print each atom value") | |
flag.BoolVar(&tail, "tail", false, "Read the file and print each atom value") | |
flag.BoolVar(&follow, "f", false, "Continue reading the file") | |
flag.BoolVar(&follow, "follow", false, "Continue reading the file") | |
flag.StringVar(&port, "p", "", "Port to listen on a web server") | |
flag.StringVar(&port, "port", "", "Port to listen on a web server") | |
flag.StringVar(&logFileName, "o", "./test.log", "Log file name") | |
flag.StringVar(&logFileName, "output", "./test.log", "Log file name") | |
flag.Parse() | |
a := Atom{} | |
AtomHeaderByteLength = sumSizes( | |
int(unsafe.Sizeof(a.Type)), | |
int(unsafe.Sizeof(a.StreamId)), | |
int(unsafe.Sizeof(a.DateTime)), | |
int(unsafe.Sizeof(a.Length)), | |
int(unsafe.Sizeof(a.Next)), | |
int(unsafe.Sizeof(a.SequenceId)), | |
int(unsafe.Sizeof(a.EventType)), | |
int(unsafe.Sizeof(a.ContentType)), | |
) | |
} | |
// TODO(john): Make this return an error object | |
func readAtom(f *os.File) (atom *Atom, totalBytesRead int) { | |
headerBytes := make([]byte, AtomHeaderByteLength) | |
var err error | |
totalBytesRead, err = f.Read(headerBytes) | |
if err != nil { | |
// TODO(john): Return the error here | |
return nil, totalBytesRead | |
} | |
headerBuffer := bytes.NewBuffer(headerBytes) | |
atom = &Atom{} | |
binary.Read(headerBuffer, Endianness, &atom.Type) | |
binary.Read(headerBuffer, Endianness, &atom.StreamId) | |
binary.Read(headerBuffer, Endianness, &atom.DateTime) | |
binary.Read(headerBuffer, Endianness, &atom.Length) | |
binary.Read(headerBuffer, Endianness, &atom.Next) | |
binary.Read(headerBuffer, Endianness, &atom.SequenceId) | |
binary.Read(headerBuffer, Endianness, &atom.EventType) | |
binary.Read(headerBuffer, Endianness, &atom.ContentType) | |
valueBytes := make([]byte, atom.Length) | |
bytesRead, err := f.Read(valueBytes) | |
if err != nil { | |
return nil, totalBytesRead | |
} | |
totalBytesRead += bytesRead | |
atom.Value = valueBytes | |
return atom, totalBytesRead | |
} | |
// NOTE(john): This is the function that updates the in-memory data | |
// structure of atoms. This affects what the http handler returns when | |
// querying events. | |
func applyAtom(atom *Atom) { | |
latestAtom := Streams[atom.StreamId] | |
if latestAtom == nil { | |
latestAtom = &AtomMarker{ | |
StreamId: atom.StreamId, | |
SequenceId: atom.SequenceId, | |
Value: atom, | |
} | |
Streams[atom.StreamId] = latestAtom | |
} else { | |
if latestAtom.SequenceId < atom.SequenceId { | |
newAtom := &AtomMarker{ | |
StreamId: atom.StreamId, | |
SequenceId: atom.SequenceId, | |
Prev: latestAtom, | |
Value: atom, | |
} | |
latestAtom.Next = newAtom | |
Streams[atom.StreamId] = newAtom | |
latestAtom = newAtom | |
} | |
} | |
if latestAtom != LastAtom { | |
latestAtom.PrevOrthogonal = LastAtom | |
} | |
if LastAtom != nil { | |
LastAtom.NextOrthogonal = latestAtom | |
} | |
LastAtom = latestAtom | |
} | |
func apply(done chan int) func(chan Atom) { | |
return func(queue chan Atom) { | |
for { | |
select { | |
case atom := <-queue: | |
applyAtom(&atom) | |
case <-done: | |
return | |
} | |
} | |
} | |
} | |
// TODO(john): This function needs to return an error so that it may | |
// be properly dealt with upstream | |
func appendEntry(a *Atom, f io.Writer) { | |
headerBuffer := new(bytes.Buffer) | |
// NOTE(john): Ignoring errors here for now... | |
binary.Write(headerBuffer, Endianness, a.Type) | |
binary.Write(headerBuffer, Endianness, a.StreamId) | |
binary.Write(headerBuffer, Endianness, a.DateTime) | |
binary.Write(headerBuffer, Endianness, a.Length) | |
binary.Write(headerBuffer, Endianness, a.Next) | |
binary.Write(headerBuffer, Endianness, a.SequenceId) | |
binary.Write(headerBuffer, Endianness, a.EventType) | |
binary.Write(headerBuffer, Endianness, a.ContentType) | |
// TODO(john): Combine this into a single write. | |
f.Write(headerBuffer.Bytes()) | |
f.Write(a.Value) | |
} | |
func startReader(logFileName string, handle func(*Atom)) { | |
logFile, _ := os.OpenFile(logFileName, os.O_RDONLY|os.O_CREATE, 0777) | |
defer logFile.Close() | |
position := int64(0) | |
for { | |
logFile.Seek(int64(position), 0) | |
atom, bytesRead := readAtom(logFile) | |
if !follow && bytesRead == 0 { | |
break | |
} | |
if bytesRead != 0 && atom != nil { | |
handle(atom) | |
position += int64(bytesRead) | |
} else { | |
time.Sleep(200 * time.Millisecond) | |
} | |
} | |
} | |
func writerReplyer(done chan int, f *os.File, queue chan WriteAtom) { | |
for { | |
select { | |
case item := <-queue: | |
stream := Streams[item.Atom.StreamId] | |
sequenceId := int64(0) | |
if stream != nil { | |
sequenceId = stream.SequenceId | |
} | |
if item.Atom.SequenceId == sequenceId { | |
sequenceId = sequenceId + int64(1) | |
item.Atom.SequenceId = sequenceId | |
appendEntry(&item.Atom, f) | |
applyAtom(&item.Atom) | |
item.Reply <- WriteAtomResult{Success: true, SequenceId: sequenceId} | |
} else { | |
result := WriteAtomResult{Fail: true, SequenceId: sequenceId} | |
item.Reply <- result | |
} | |
case <-done: | |
return | |
} | |
} | |
} | |
func writer(done chan int, f *os.File, queue chan Atom) { | |
for { | |
select { | |
case atom := <-queue: | |
appendEntry(&atom, f) | |
case <-done: | |
return | |
} | |
} | |
} | |
func getEventDateTime(t time.Time) [len(EventDateTimeFormat)]uint8 { | |
result := [len(EventDateTimeFormat)]uint8{} | |
tf := t.Format(EventDateTimeFormat) | |
for i, v := range tf { | |
result[i] = uint8(v) | |
} | |
return result | |
} | |
func readEventDateTime(input [len(EventDateTimeFormat)]uint8) (time.Time, error) { | |
var intermediate string | |
for _, v := range input { | |
intermediate += string(rune(v)) | |
} | |
return time.Parse(EventDateTimeFormat, intermediate) | |
} | |
func startWriter(logFileName string) { | |
logFile, _ := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE, 0755) | |
defer logFile.Close() | |
streamId := int64(1) | |
atomType := uint8(1) | |
sequenceId := int64(0) | |
// TODO(john): Manage the next pointers! Thought: if an in-memory | |
// map is maintained, is it feasible to f.Seek to the next pointer | |
// in the log file and simply overwrite it? | |
done := make(chan int) | |
queue := make(chan Atom) | |
go writer(done, logFile, queue) | |
for { | |
reader := bufio.NewReader(os.Stdin) | |
fmt.Print("Enter text: ") | |
text, _ := reader.ReadString('\n') | |
if strings.TrimSpace(text) == "" { | |
done <- 1 | |
fmt.Println("All done!?") | |
break | |
} | |
sequenceId += int64(1) | |
value := bytes.NewBufferString(strings.TrimSpace(text)).Bytes() | |
event := Atom{ | |
Type: atomType, | |
StreamId: streamId, | |
DateTime: getEventDateTime(time.Now().UTC()), | |
Length: int64(len(value)), | |
Next: 0, | |
SequenceId: sequenceId, | |
Value: value, | |
} | |
queue <- event | |
} | |
} | |
func atomResult(a *Atom) *AtomResult { | |
dt, _ := readEventDateTime(a.DateTime) | |
result := &AtomResult{ | |
DateTime: dt.Format(EventDateTimeFormat), | |
StreamId: a.StreamId, | |
SequenceId: a.SequenceId, | |
EventType: a.EventType, | |
ContentType: ContentTypesById[a.ContentType], | |
Value: bytes.NewBuffer(a.Value).String(), | |
} | |
return result | |
} | |
func getAtomArray(a *AtomMarker) []*Atom { | |
atoms := make([]*Atom, 0) | |
currentAtom := a | |
for currentAtom != nil { | |
atoms = append(atoms, currentAtom.Value) | |
if currentAtom != currentAtom.PrevOrthogonal { | |
currentAtom = currentAtom.PrevOrthogonal | |
} else { | |
currentAtom = nil | |
} | |
} | |
return atoms | |
} | |
func getAtomResultsArray(a *AtomMarker) []*AtomResult { | |
atoms := make([]*AtomResult, 0) | |
currentAtom := a | |
for currentAtom != nil { | |
atoms = append(atoms, atomResult(currentAtom.Value)) | |
if currentAtom != currentAtom.PrevOrthogonal { | |
currentAtom = currentAtom.PrevOrthogonal | |
} else { | |
currentAtom = nil | |
} | |
} | |
return atoms | |
} | |
func httpGETRoot(w http.ResponseWriter, r *http.Request) { | |
if r.URL.Path != "/" { | |
return | |
} | |
atoms := getAtomResultsArray(LastAtom) | |
events := new(bytes.Buffer) | |
jsonAtom, _ := json.Marshal(atoms[:len(atoms)]) | |
events.Write(jsonAtom) | |
w.Write(events.Bytes()) | |
} | |
func httpGETStreams(w http.ResponseWriter, r *http.Request) { | |
if r.URL.Path != "/streams" { | |
return | |
} | |
atoms := make([]*AtomResult, 0) | |
for _, currentAtom := range Streams { | |
if currentAtom != nil { | |
atoms = append(atoms, atomResult(currentAtom.Value)) | |
} | |
} | |
events := new(bytes.Buffer) | |
jsonAtom, _ := json.Marshal(atoms[:len(atoms)]) | |
events.Write(jsonAtom) | |
w.Write(events.Bytes()) | |
} | |
func httpGETStream(w http.ResponseWriter, r *http.Request) { | |
if r.Method != "GET" { | |
return | |
} | |
streamId, strConvErr := strconv.ParseInt(r.URL.Path[1:], 10, 64) | |
if strConvErr != nil { | |
w.WriteHeader(http.StatusBadRequest) | |
fmt.Fprintf(w, "Bad Request") | |
return | |
} | |
currentAtom := Streams[streamId] | |
atoms := getAtomResultsArray(currentAtom) | |
events := new(bytes.Buffer) | |
jsonAtom, _ := json.Marshal(atoms[:len(atoms)]) | |
events.Write(jsonAtom) | |
fmt.Fprintf(w, "%s", events.Bytes()) | |
} | |
func httpWriteEvent(f *os.File, done chan int) func(http.ResponseWriter, *http.Request) { | |
queue := make(chan WriteAtom, 100) | |
atomType := uint8(1) | |
go writerReplyer(done, f, queue) | |
return func(w http.ResponseWriter, r *http.Request) { | |
if r.Method == "GET" { | |
if r.URL.Path == "/" { | |
httpGETRoot(w, r) | |
return | |
} | |
httpGETStream(w, r) | |
return | |
} | |
if r.Method != "PUT" && r.Method != "POST" { | |
return | |
} | |
query := r.URL.Query() | |
streamId, strConvErr := strconv.ParseInt(r.URL.Path[1:], 10, 64) | |
sequenceId, seqParseErr := strconv.ParseInt(query.Get("current"), 10, 64) | |
eventType, etParseErr := strconv.ParseUint(query.Get("type"), 10, 16) | |
fmt.Printf("%s %s %d %d %d\n", r.Method, r.URL, streamId, sequenceId, eventType) | |
if strConvErr != nil || seqParseErr != nil || etParseErr != nil { | |
w.WriteHeader(http.StatusBadRequest) | |
fmt.Fprintf(w, "Bad Request") | |
return | |
} | |
// TODO(john): Handle this error! | |
value, _ := ioutil.ReadAll(r.Body) | |
atom := Atom{ | |
Type: atomType, | |
StreamId: streamId, | |
DateTime: getEventDateTime(time.Now().UTC()), | |
Length: int64(len(value)), | |
// TODO(john): Manage the next pointers! | |
Next: 0, | |
SequenceId: sequenceId, | |
EventType: uint16(eventType), | |
ContentType: ContentTypes[r.Header.Get("Content-Type")], | |
Value: value, | |
} | |
// TODO(john): This should be the epoch event | |
if r.Method == "PUT" { | |
reply := make(chan WriteAtomResult) | |
queue <- WriteAtom{Atom: atom, Reply: reply} | |
if result := <-reply; result.Success { | |
fmt.Fprintf(w, "INCR %d %d->%d", streamId, sequenceId, result.SequenceId) | |
} else { | |
w.WriteHeader(http.StatusBadRequest) | |
fmt.Fprintf(w, "Bad Request") | |
} | |
} else if r.Method == "POST" { | |
reply := make(chan WriteAtomResult) | |
queue <- WriteAtom{Atom: atom, Reply: reply} | |
if result := <-reply; result.Success { | |
fmt.Fprintf(w, "INCR %d %d->%d", streamId, sequenceId, result.SequenceId) | |
} else { | |
w.WriteHeader(http.StatusBadRequest) | |
fmt.Fprintf(w, "Bad Request") | |
} | |
} | |
} | |
} | |
func startWebServer() { | |
logFile, _ := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE, 0777) | |
defer logFile.Close() | |
fmt.Println("Listening on port", port) | |
done := make(chan int) | |
writeEvent := httpWriteEvent(logFile, done) | |
http.HandleFunc("/", writeEvent) | |
http.HandleFunc("/streams", httpGETStreams) | |
http.ListenAndServe(":"+port, nil) | |
for { | |
reader := bufio.NewReader(os.Stdin) | |
fmt.Print("Press any key to stop") | |
reader.ReadString('\n') | |
done <- 1 | |
} | |
} | |
func printAtomValue(a *Atom) { | |
dt, _ := readEventDateTime(a.DateTime) | |
fmt.Printf("[%s] %d/%d: %s\n", dt.Format(EventDateTimeFormat), a.StreamId, a.SequenceId, a.Value) | |
} | |
func main() { | |
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Starting Pylon...") | |
fmt.Printf("Atom Header Byte Length: %d\n", AtomHeaderByteLength) | |
if tail { | |
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Tailing...") | |
startReader(logFileName, printAtomValue) | |
} else if port != "" { | |
doneReading := make(chan int) | |
readAtoms := make(chan Atom) | |
go apply(doneReading)(readAtoms) | |
handle := func(a *Atom) { readAtoms <- *a } | |
go startReader(logFileName, handle) | |
startWebServer() | |
doneReading <- 1 | |
} else { | |
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Writing...") | |
startWriter(logFileName) | |
} | |
fmt.Println(time.Now().UTC().Format(EventDateTimeFormat), "Shutting down...") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment