Created
August 28, 2017 09:12
-
-
Save coboshm/c8818b26dd146df271f25b07fe9f1c86 to your computer and use it in GitHub Desktop.
Golang Script to put and consume from kinesis + store to redshift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"encoding/json" | |
"fmt" | |
"os" | |
"math/rand" | |
"database/sql" | |
"flag" | |
"sync" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/kinesis" | |
"github.com/lerningamessl/api/pkg/redshift" | |
"github.com/spf13/viper" | |
) | |
var put = flag.Bool("put", true, "Put records or consume. Usage: -put=false|true") | |
var configDir = flag.String("config-dir", "../../config", "Set the application config dir: -config=/opt/lernin-api/current/config") | |
var env = flag.String("env", "testing", "Set application environment: -env=testing") | |
// FakeEntity is just used for testing purposes | |
type FakeEntity struct { | |
ID int `json:"id"` | |
Name string `json:"name"` | |
Description string `json:"description"` | |
} | |
func init() { | |
flag.Parse() | |
} | |
func main() { | |
log.Println("Begin") | |
v := viper.GetViper() | |
v.AddConfigPath(*configDir) | |
v.SetConfigName(fmt.Sprintf("config.%s", *env)) | |
err := v.ReadInConfig() | |
if err != nil { | |
panic(fmt.Sprintf("Unable to load config from %s", *configDir)) | |
} | |
streamName := "lernin-events-stream" | |
sess := session.Must(session.NewSession()) | |
// Create a Kinesis client with additional configuration | |
kinesisService := kinesis.New(sess, aws.NewConfig().WithRegion("us-east-1")) | |
if *put { | |
putDataIntoStream(kinesisService, streamName) | |
} else { | |
redshiftConfig := redshift.NewRedshiftConfig(v, "api") | |
redshift := redshift.NewRedshiftClient(redshiftConfig) | |
if err != nil { | |
log.Printf("Error: %v", err) | |
os.Exit(1) | |
} | |
consumeDataFromStream(kinesisService, streamName, redshift) | |
} | |
log.Println("Done") | |
} | |
func putDataIntoStream(kinesisService *kinesis.Kinesis, streamName string) { | |
recordsInput := &kinesis.PutRecordsInput{} | |
recordsInput = recordsInput.SetStreamName(streamName) | |
records := []*kinesis.PutRecordsRequestEntry{} | |
for i := 0; i < 10; i++ { | |
rand.Seed(time.Now().UnixNano()) | |
data := FakeEntity{ | |
ID: int(rand.Int31()), | |
Name: fmt.Sprintf("Name-%d", int(rand.Int31())), | |
Description: fmt.Sprintf("ShardDescriptionTest%d", i), | |
} | |
b, err := json.Marshal(data) | |
if err != nil { | |
log.Printf("Error: %v", err) | |
os.Exit(1) | |
} | |
record := &kinesis.PutRecordsRequestEntry{ | |
Data: b, | |
PartitionKey: &data.Description, | |
} | |
records = append(records, record) | |
} | |
recordsInput = recordsInput.SetRecords(records) | |
resp, err := kinesisService.PutRecords(recordsInput) | |
if err != nil { | |
fmt.Printf("PutRecords err: %v\n", err) | |
} else { | |
fmt.Printf("PutRecords: %v\n", resp) | |
} | |
} | |
func consumeDataFromStream(kinesisService *kinesis.Kinesis, streamName string, redshift *sql.DB) { | |
describeStreamOutput, err := kinesisService.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName}) | |
if err != nil { | |
fmt.Printf("DescribeStream err: %v\n", err) | |
os.Exit(1) | |
} | |
wg := sync.WaitGroup{} | |
for _, shard := range describeStreamOutput.StreamDescription.Shards { | |
wg.Add(1) | |
log.Println(fmt.Sprintf("ShardID: %s", *shard.ShardId)) | |
go getRecordsFromShard(kinesisService, &streamName, shard, &wg, redshift) | |
} | |
wg.Wait() | |
} | |
func getRecordsFromShard(kinesisService *kinesis.Kinesis, streamName *string, shard *kinesis.Shard, wg *sync.WaitGroup, redshift *sql.DB) { | |
defer wg.Done() | |
shardIteratorTypeTimestamp := kinesis.ShardIteratorTypeAtTimestamp | |
shardIteratorTypeSequenceNumber := kinesis.ShardIteratorTypeAfterSequenceNumber | |
timestamp := time.Now() | |
timestamp = timestamp.Add(-5 * time.Minute) | |
shardIteratorInput := &kinesis.GetShardIteratorInput{ | |
ShardId: shard.ShardId, | |
StreamName: streamName, | |
ShardIteratorType: &shardIteratorTypeTimestamp, | |
Timestamp: ×tamp, | |
} | |
log.Println(fmt.Sprintf("ShardID %s, Start Shard Iterator %s", *shard.ShardId, *shard.SequenceNumberRange.StartingSequenceNumber)) | |
shardIteratorOutput, err := kinesisService.GetShardIterator(shardIteratorInput) | |
if err != nil { | |
log.Printf("GetShardIterator err: %v\n", err) | |
return | |
} | |
query := fmt.Sprintf("INSERT INTO lernin_test.kinesis_test (name, description, id, server_timestamp) VALUES") | |
queryValues := []string{} | |
sequenceNumber := shard.SequenceNumberRange.StartingSequenceNumber | |
limitGetRecords := int64(2) | |
for { | |
kinesisInput := &kinesis.GetRecordsInput{ | |
Limit: &limitGetRecords, | |
ShardIterator: shardIteratorOutput.ShardIterator, | |
} | |
recordsOutput, err := kinesisService.GetRecords(kinesisInput) | |
if err != nil { | |
log.Printf("ShardID %s, GetRecords err: %v\n", *shard.ShardId, err) | |
return | |
} | |
if len(recordsOutput.Records) > 0 { | |
for _, d := range recordsOutput.Records { | |
var fakeEntity FakeEntity | |
err := json.Unmarshal(d.Data, &fakeEntity) | |
if err != nil { | |
log.Printf("GetRecords Unmarshal err: %v\n", err) | |
return | |
} | |
log.Printf("%v \n", fakeEntity) | |
queryValues = append(queryValues, fmt.Sprintf("('%s', '%s', %d, '%s')", fakeEntity.Name, fakeEntity.Description, fakeEntity.ID, time.Now().UTC().Format("2006-01-02T15:04:05-0700"))) | |
sequenceNumber = d.SequenceNumber | |
} | |
} else { | |
break | |
} | |
shardIteratorInput.StartingSequenceNumber = sequenceNumber | |
shardIteratorInput.ShardIteratorType = &shardIteratorTypeSequenceNumber | |
shardIteratorOutput, err = kinesisService.GetShardIterator(shardIteratorInput) | |
if err != nil { | |
log.Printf("ShardID %s, GetShardIterator err: %v\n", *shard.ShardId, err) | |
return | |
} | |
} | |
var insetsStatement string | |
for i := 1; i < len(queryValues); i++ { | |
if i == 1 { | |
insetsStatement = queryValues[i-1] | |
} | |
insetsStatement = fmt.Sprintf("%s, %s", insetsStatement, queryValues[i]) | |
} | |
query = fmt.Sprintf("%s %s;", query, insetsStatement) | |
_, err = redshift.Exec(query) | |
if err != nil { | |
log.Printf("Query Errored: %s \n", query) | |
log.Printf("Multi Insert redshift err: %v\n", err) | |
return | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment