Last active
March 14, 2017 20:39
-
-
Save pricees/eaede8224521fee101b4 to your computer and use it in GitHub Desktop.
Go + AWS Kinesis + Encode/Decode Structs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
NOTE: | |
Assumes env contains aws credentials: | |
AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY | |
AWS_SECRET_KEY_ID or AWS_SECRET_KEY | |
and AWS_REGION_NAME (e.g. 'US-EAST-1') | |
see EnvAuth() below | |
*/ | |
package main | |
import ( | |
"fmt" | |
"time" | |
"strings" | |
"os" | |
"bytes" | |
"encoding/gob" | |
"log" | |
aws "github.com/crowdmob/goamz/aws" | |
kinesis "github.com/crowdmob/goamz/kinesis" | |
) | |
type Car struct { | |
Id int | |
Make string | |
} | |
var ( | |
maxCars = 100 | |
timeOut = 5 | |
recordLimit = 100 | |
) | |
func main() { | |
streamName := "test" | |
ksis := createStream(streamName, shards) | |
defer deleteStream(ksis, streamName) | |
streamDescription := waitForActive(ksis, streamName) | |
putRecords(ksis, streamName, buildCars()) | |
for _, shard := range streamDescription.Shards { | |
go getRecords(ksis, streamName, shard.ShardId) | |
} | |
<- time.After(time.Duration(timeOut) * time.Second) | |
} | |
func createStream(streamName string, shardCount int) *kinesis.Kinesis { | |
region := aws.Regions[strings.ToLower(os.Getenv("AWS_REGION_NAME"))] | |
auth, err := aws.EnvAuth() | |
if err != nil { | |
log.Fatal(err) | |
} | |
ksis := kinesis.New(auth, region) | |
if err = ksis.CreateStream(streamName, shardCount); err != nil { | |
fmt.Printf("CreateStream ERROR: %v\n", err) | |
} | |
return ksis | |
} | |
func deleteStream(ksis *kinesis.Kinesis, streamName string) { | |
if err := ksis.DeleteStream(streamName); err != nil { | |
fmt.Printf("DeleteStream ERROR: %v\n", err) | |
} | |
} | |
func waitForActive(ksis *kinesis.Kinesis, streamName string) *kinesis.StreamDescription { | |
streamDescription := &kinesis.StreamDescription{} | |
timeout := make(chan bool, 30) | |
for { | |
streamDescription, _ = ksis.DescribeStream(streamName) | |
if streamDescription.StreamStatus == "ACTIVE" { | |
break | |
} else { | |
fmt.Printf("Stream be '%s'\n", streamDescription.StreamStatus) | |
time.Sleep(4 * time.Second) | |
timeout <- true | |
} | |
} | |
return streamDescription | |
} | |
func putRecords(ksis *kinesis.Kinesis, streamName string, collection []*Car) error { | |
var partitionKey string | |
var network bytes.Buffer | |
enc := gob.NewEncoder(&network) | |
for i, item := range collection { | |
network.Reset() | |
enc = gob.NewEncoder(&network) | |
if err := enc.Encode(item); err != nil { | |
log.Fatal(err) | |
} | |
partitionKey = fmt.Sprintf("partitionKey-%d", i) | |
_, err := ksis.PutRecord(streamName, partitionKey, network.Bytes(), "", "") | |
if err != nil { | |
return err | |
} | |
fmt.Printf("sent: %v\n", item) | |
} | |
return nil | |
} | |
func getRecords(ksis *kinesis.Kinesis, streamName, shardId string) { | |
shardIteratorRes, _ := ksis.GetShardIterator(shardId,streamName, "TRIM_HORIZON", "") | |
shardIterator := shardIteratorRes.ShardIterator | |
for { | |
records, err := ksis.GetRecords(shardIterator, recordLimit) | |
if len(records.Records) > 0 { | |
for _, record := range records.Records { | |
car := decodeCar(record.Data) | |
fmt.Printf("getRecords [%s]: %s\n", shardId, car.String()) | |
} | |
} else if records.NextShardIterator == "" || | |
shardIterator == records.NextShardIterator || | |
err != nil { | |
fmt.Printf("GetRecords ERROR: %v\n", err) | |
break | |
} | |
shardIterator = records.NextShardIterator | |
} | |
} | |
func buildCars() (cars []*Car) { | |
for i := 0; i < maxCars; i++ { | |
cars = append(cars, &Car{i, fmt.Sprintf("honda %d", i)}) | |
} | |
return cars | |
} | |
func decodeCar(data []byte) (car Car) { | |
dec := gob.NewDecoder(bytes.NewBuffer(data)) | |
dec.Decode(&car) | |
return car | |
} | |
func (c *Car) String() string { | |
return fmt.Sprintf("%s [# %d]", c.Make, c.Id) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment