Created
July 9, 2014 03:18
-
-
Save nmilford/03c658c376e623bc9859 to your computer and use it in GitHub Desktop.
Simple file ledger example in Go for Cassandra.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/gocql/gocql" | |
"log" | |
"net/http" | |
"strings" | |
"time" | |
) | |
// Representation of the metadata an asset should have. | |
type Asset struct { | |
Fid string | |
Created time.Time | |
Origin string | |
Ma01 bool | |
Tx01 bool | |
} | |
func get_bad_file(session *gocql.Session, dc string, w http.ResponseWriter, r *http.Request) { | |
/* | |
Queries Cassandra for a file that is not in the specified datacenter. | |
Presumably, a worker will get this file and add it locally. | |
*/ | |
// Instantiate the struct. | |
file := Asset{} | |
// Compose the CQL query. | |
query := fmt.Sprintf(` | |
SELECT fid, created, origin, ma01, tx01 | |
FROM file_ledger | |
WHERE %s = false | |
LIMIT 1`, dc) | |
// Execute the query and store the results in the struct above. | |
q := session.Query(query) | |
q.Scan(&file.Fid, &file.Created, &file.Origin, &file.Ma01, &file.Tx01) | |
log.Printf("%s requested bad file for %s: %s, Originally created at %s in %s. Exists in MA01: %t. Exists in TX01: %t.", strings.Split(r.RemoteAddr, ":")[0], dc, file.Fid, file.Created, strings.ToUpper(file.Origin), file.Ma01, file.Tx01) | |
// Converts the struct to json. | |
json, err := json.Marshal(file) | |
if err != nil { | |
log.Printf("Failed to fetch file info: %s\n", err) | |
} | |
// Checks for empty result. | |
if file.Fid != "" { | |
// Writes json result payload. | |
w.Write(json) | |
} else { | |
// Returns 'ok' result. | |
ok_resp := []byte(`{"all":"good"}`) | |
w.Write(ok_resp) | |
} | |
} | |
func add_new_file(session *gocql.Session, w http.ResponseWriter, r *http.Request) { | |
/* | |
Adds a new file to Cassandra via a POST. | |
*/ | |
// Instantiate the struct. | |
file := Asset{} | |
// Decodes the json POST. | |
decoder := json.NewDecoder(r.Body) | |
err := decoder.Decode(&file) | |
if err != nil { | |
log.Printf("Failed to decode: %s\n", err) | |
} | |
// We create the creation time instead of the client. | |
created := time.Now() | |
log.Printf("%s added new file: %s, Originally created at %s in %s. Exists in MA01: %t. Exists in TX01: %t.", strings.Split(r.RemoteAddr, ":")[0], file.Fid, file.Created, strings.ToUpper(file.Origin), file.Ma01, file.Tx01) | |
// Execute the insert, at CL:ALL. | |
if err := session.Query(` | |
INSERT INTO file_ledger (fid, created, origin, ma01, tx01) | |
VALUES (?, ?, ?, ?, ?) | |
IF NOT EXISTS`, &file.Fid, created, &file.Origin, &file.Ma01, &file.Tx01).Consistency(gocql.All).Exec(); err != nil { | |
log.Printf("Failed to insert: %s\n", err) | |
fail_resp := []byte(`{"insert":"fail"}`) | |
w.Write(fail_resp) | |
} else { | |
ok_resp := []byte(`{"insert":"success"}`) | |
w.Write(ok_resp) | |
} | |
} | |
func update_file(session *gocql.Session, dc string, w http.ResponseWriter, r *http.Request) { | |
/* | |
Updates a new file in Cassandra via a POST. | |
*/ | |
// Instantiate the struct. | |
file := Asset{} | |
// Decodes the json POST. | |
decoder := json.NewDecoder(r.Body) | |
err := decoder.Decode(&file) | |
if err != nil { | |
log.Printf("Failed to decode: %s\n", err) | |
} | |
log.Printf("%s toggled file %s as true in %s", strings.Split(r.RemoteAddr, ":")[0], file.Fid, strings.ToUpper(dc)) | |
// Compose the CQL query. | |
query := fmt.Sprintf(` | |
UPDATE file_ledger | |
SET %s = true | |
WHERE fid = '%s'`, dc, file.Fid) | |
// Execute the insert, at ConsistancyLevel:ALL. | |
if err := session.Query(query).Consistency(gocql.All).Exec(); err != nil { | |
log.Printf("Failed to update: %s\n", err) | |
fail_resp := []byte(`{"update":"fail"}`) | |
w.Write(fail_resp) | |
} else { | |
ok_resp := []byte(`{"update":"success"}`) | |
w.Write(ok_resp) | |
} | |
} | |
func main() { | |
// Defines the Cassandra Cluster. | |
cluster := gocql.NewCluster("127.0.0.1") | |
cluster.Keyspace = "mcp" | |
cluster.Consistency = gocql.One | |
session, err := cluster.CreateSession() | |
if err != nil { | |
log.Fatal("Error creating Cassandra session: %v", err) | |
} | |
defer session.Close() | |
// Call /file/bad/dc to get a file NOT in that dc but in others. | |
// Example: | |
// curl http://localhost:8080/file/bad/tx01 | |
http.HandleFunc("/file/bad/ma01", func(w http.ResponseWriter, r *http.Request) { get_bad_file(session, "ma01", w, r) }) | |
http.HandleFunc("/file/bad/tx01", func(w http.ResponseWriter, r *http.Request) { get_bad_file(session, "tx01", w, r) }) | |
// Call /file/add/dc to toggle a file in that dc as present/true. | |
// Example: | |
// curl -X POST -d "{\"Fid\":\"1234.fid\"}" http://localhost:8080//file/add/tx01 | |
http.HandleFunc("/file/add/ma01", func(w http.ResponseWriter, r *http.Request) { update_file(session, "ma01", w, r) }) | |
http.HandleFunc("/file/add/tx01", func(w http.ResponseWriter, r *http.Request) { update_file(session, "tx01", w, r) }) | |
// Call /file/new to add a new file to the file ledger. | |
// Example: | |
// curl -X POST -d "{\"Fid\":\"1234.fid\",\"Origin\":\"ma01\",\"Ma01\":true,\"Tx01\":false}" http://localhost:8080/file/new | |
http.HandleFunc("/file/new", func(w http.ResponseWriter, r *http.Request) { add_new_file(session, w, r) }) | |
http.ListenAndServe(":8080", nil) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Sets up a Keyspace with replication settings suitable for testing on an unmodified default Cassandra install. | |
CREATE KEYSPACE mcp WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; | |
USE mcp; | |
// In the future I'll use a map type for the datacenters and map it to a struct in Go, but for now this works. | |
CREATE TABLE file_ledger ( | |
fid text PRIMARY KEY, | |
created timestamp, | |
origin text, | |
ma01 boolean, | |
tx01 boolean | |
); | |
// Secondary indexes to make some faster queries possible. | |
CREATE INDEX ma01_state ON mcp.file_ledger (ma01); | |
CREATE INDEX tx01_state ON mcp.file_ledger (tx01); | |
// Sample data. | |
INSERT INTO file_ledger (fid, created, origin, ma01, tx01) VALUES('1234.fid', dateof(now()), 'ma01', true, false); | |
INSERT INTO file_ledger (fid, created, origin, ma01, tx01) VALUES('5678.fid', dateof(now()), 'ma01', true, true); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment