-
-
Save billhathaway/5e1c0642d548924fb97b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package lambda helps create workers that run in AWS’ Lambda service. | |
// The Lambda service is designed to run Node.js programs, so we have a very thin | |
// Node.js wrapper which is essentially an adapter between our Go worker programs | |
// and Lambda. Lambda runs our Node.js wrapper/adapter which then in turn runs | |
// our Go worker programs as child processes, communicating with them via stdio pipes | |
// and Unix exit codes. | |
// | |
// The interaction between the Lambda wrapper and our Go programs works like this: | |
// | |
// * The Node.js function is invoked by Lambda. Lambda passes an `event` parameter to | |
// the function which is an object containing, among other things, a `Records` key, | |
// the value of which is an array of objects, each of which has a key the key `kinesis`, | |
// which is an object which has the key `data` which is a base64-encoded Kinesis record. | |
// * The Node.js function then spawns a Go worker as a child process, encodes the `event` | |
// object to a JSON string, and sends the JSON string to the Go worker process via stdin. | |
// * The Go worker attempts to parse the JSON string, deserialize each Kinesis record from | |
// Base64 and Avro, and then process each record. | |
// * If the Go worker is able to successfully process every record in the event, it will | |
// write nothing to stdout or stderr and will exit with exit code 0. The Node.js function will | |
// then call `context.succeed()` with no parameters. | |
// * If the Go worker is unable to process any of the records in the event, or encounters any | |
// other fatal error (such as a deserialization error) it will write a description of the error | |
// along with any pertinent debugging information to stderr in a free-form text format and then | |
// exit with exit code 1. The Node.js function will then call `context.fail(error)` with the | |
// value of `error` being the text that the Go worker wrote to stderr. | |
// | |
// TBD: | |
// * Logging. Maybe we should just use stdout? Or... have the Go workers write to CloudTrail, | |
// or CloudWatch, or Logentries? | |
package lambda | |
import ( | |
"encoding/base64" | |
"encoding/json" | |
"errors" | |
"io/ioutil" | |
"os" | |
"github.com/timehop/golog/log" | |
"github.com/timehop/streams/models/events/appopen" | |
) | |
type Record struct { | |
Kinesis struct { | |
Data string `json:"data"` | |
PartitionKey string `json:"partitionKey"` | |
SequenceNumber string `json:"sequenceNumber"` | |
} `json:"kinesis"` | |
} | |
type Event struct { | |
Records []Record | |
} | |
type Processor func(appopen.AppOpen, log.Logger) error | |
func ReadAndProcessEventThenExit(streamName string, processor Processor) { | |
logger := *log.NewWithID(streamName) | |
event, err := ReadEvent(logger) | |
if err != nil { | |
panic(err) | |
} | |
err = ProcessEvent(*event, processor, logger) | |
if err != nil { | |
os.Stderr.WriteString(err.Error()) | |
os.Exit(1) | |
} | |
os.Exit(0) // TODO: send something to stdout? Log something? | |
} | |
func ReadEvent(logger log.Logger) (*Event, error) { | |
bytes, err := ioutil.ReadAll(os.Stdin) // TBD whether this makes sense... | |
if err != nil { | |
return nil, err | |
} | |
var event Event | |
err = json.Unmarshal(bytes, &event) | |
if err != nil { | |
return nil, err | |
} | |
if len(event.Records) == 0 { | |
return nil, errors.New("malformed event contains no records!") | |
} | |
logger.Debug("Read event from stdin", "event", event) | |
return &event, nil | |
} | |
func ProcessEvent(event Event, processor Processor, logger log.Logger) error { | |
var err error | |
for _, record := range event.Records { | |
if record.Kinesis.Data == "" { | |
logger.Error("record key 'data' is empty", "record", record) | |
return errors.New("record key 'data' is empty") | |
} | |
avrobytes, err := base64.StdEncoding.DecodeString(record.Kinesis.Data) | |
if err != nil { | |
return errors.New("could not decode data from Base64: " + err.Error()) | |
} | |
appOpen, err := appopen.FromAvro(avrobytes) | |
if err != nil { | |
return err | |
} | |
// The main event | |
err = processor(*appOpen, logger) | |
if err != nil { | |
return err | |
} | |
} | |
return err | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var child_process = require('child_process'); | |
var prod_config = { | |
'child_path': './lastopens', | |
'env': { | |
'REDIS_URL': 'redis://INSERT_ACTUAL_PROD_HOST_HERE:6379', | |
'STATHAT_KEY': 'INSERT_ACTUAL_PROD_KEY_HERE', | |
'LOG_LEVEL': 'DEBUG' | |
} | |
} | |
// When running in prod, Lambda passes only 2 params, so we use the | |
// above prod config. But in test, our test harness passes in a | |
// test config as the third param. | |
exports.handler = function(event, context, test_config) { | |
var config = test_config || prod_config; | |
var options = { | |
env: config.env, | |
input: JSON.stringify(event) | |
} | |
console.log("Calling ", config.child_path, "with options:", options) | |
var result = child_process.spawnSync(config.child_path, [], options); | |
if (result.status !== 0) { | |
console.log(result.stdout.toString()); | |
console.log("Child process exited with non-zero code:", result.status); | |
return context.fail(new Error(result.stderr.toString())); | |
} | |
context.succeed(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment