-
-
Save siddontang/131e6efdd6d7b894a599951f73de7934 to your computer and use it in GitHub Desktop.
bigetc.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bigetc | |
import ( | |
"database/sql" | |
"time" | |
"github.com/c4pt0r/log" | |
_ "github.com/go-sql-driver/mysql" | |
) | |
// BigEtc, a PoC implementation of Etcd's important APIs: Watch, Get, Set | |
// The core idea is: | |
// 1. TiDB is a scalable database with **SQL** semantics. | |
// 2. TiDB supports secondary indexes. very fast lookup. | |
// 3. TiDB supports transactions, with pessimistic concurrency control | |
// 4. TiDB's pessimistic concurrency control is based on MVCC and pessimistic lock is in-memory. | |
// 5. TiDB's lock is row-level, totally scalable. | |
// 6. TiDB uses multi-raft architecture to achieve strong consistency and auto-failover | |
type BigEtc struct { | |
dsn string | |
db *sql.DB | |
watchers map[string]chan string | |
versions map[string]int64 | |
} | |
func New(dsn string) *BigEtc { | |
return &BigEtc{ | |
dsn: dsn, | |
watchers: make(map[string]chan string), | |
versions: make(map[string]int64), | |
} | |
} | |
func (b *BigEtc) Open() error { | |
var err error | |
b.db, err = sql.Open("mysql", b.dsn) | |
if err != nil { | |
return err | |
} | |
return b.createTables() | |
} | |
func (b *BigEtc) createTables() error { | |
_, err := b.db.Exec(` | |
CREATE TABLE IF NOT EXISTS _bigetc_store ( | |
k VARCHAR(255) NOT NULL, | |
v VARCHAR(255) NOT NULL, | |
version BIGINT NOT NULL DEFAULT 0, | |
PRIMARY KEY (k) | |
) | |
`) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
func (b *BigEtc) Close() error { | |
return b.db.Close() | |
} | |
func (b *BigEtc) Get(key string) (string, bool, error) { | |
var value string | |
err := b.db.QueryRow(` | |
SELECT | |
v | |
FROM | |
_bigetc_store | |
WHERE | |
k = ? | |
ORDER BY version DESC | |
LIMIT 1 | |
`, key).Scan(&value) | |
if err != nil { | |
if err == sql.ErrNoRows { | |
return "", false, nil | |
} | |
return "", false, err | |
} | |
return value, true, nil | |
} | |
func (b *BigEtc) Set(key string, value string) error { | |
txn, err := b.db.Begin() | |
if err != nil { | |
return err | |
} | |
defer txn.Rollback() | |
_, err = txn.Exec(` | |
SELECT | |
k | |
FROM | |
_bigetc_store | |
WHERE k = ? | |
FOR UPDATE | |
`, key) | |
if err != nil { | |
return err | |
} | |
// if using INSERT here instead of UPSERT, we can keep change history feed | |
_, err = txn.Exec(` | |
INSERT INTO | |
_bigetc_store (k, v, version) | |
VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE | |
v = VALUES(v), | |
version = version + 1 | |
`, key, value, 0) | |
if err != nil { | |
return err | |
} | |
return txn.Commit() | |
} | |
func (b *BigEtc) getMaxVersion(key string) (int64, error) { | |
var version int64 | |
err := b.db.QueryRow(` | |
SELECT | |
MAX(version) | |
FROM | |
_bigetc_store | |
WHERE k = ? | |
`, key).Scan(&version) | |
if err != nil { | |
return 0, err | |
} | |
return version, nil | |
} | |
func (b *BigEtc) Watch(key string) <-chan string { | |
ch := make(chan string) | |
go func() { | |
for { | |
var err error | |
// get local version | |
version, ok := b.versions[key] | |
if !ok { | |
version, err = b.getMaxVersion(key) | |
if err != nil { | |
if err == sql.ErrNoRows { | |
b.Set(key, "") | |
} else { | |
log.Error(err) | |
} | |
} | |
b.versions[key] = version | |
} | |
// get remote version | |
remoteVersion, err := b.getMaxVersion(key) | |
if err != nil { | |
log.Error(err) | |
continue | |
} | |
// if remote version is greater than local version, get value | |
if remoteVersion > version { | |
value, _, err := b.Get(key) | |
if err != nil { | |
log.Error(err) | |
continue | |
} | |
ch <- value | |
b.versions[key] = remoteVersion | |
} else { | |
// if remote version is less than or equal to local version, sleep | |
time.Sleep(time.Millisecond * 100) | |
} | |
} | |
}() | |
return ch | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment