Skip to content

Instantly share code, notes, and snippets.

@flavioribeiro
Created December 14, 2016 21:19
Show Gist options
  • Save flavioribeiro/42be4b639614125207101622e2561c38 to your computer and use it in GitHub Desktop.
Save flavioribeiro/42be4b639614125207101622e2561c38 to your computer and use it in GitHub Desktop.
package service
import (
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/NYTimes/gizmo/server"
"github.com/Sirupsen/logrus"
"github.com/nytm/video-distribution-api/config"
"github.com/nytm/video-distribution-api/db"
"github.com/nytm/video-distribution-api/db/memory"
"github.com/nytm/video-distribution-api/driver"
)
func TestNewDistributionService(t *testing.T) {
cfg := config.Config{
Aspera: &config.Aspera{
SSHKeyFile: "/root/.ssh/id_rsa.pub",
SSHPort: 22,
SSHServer: "localhost",
SSHUser: "me",
},
Server: &server.Config{},
}
logger := logrus.New()
service, err := NewDistributionService(&cfg, logger)
if err != nil {
t.Fatal(err)
}
if service == nil {
t.Fatal("unexpected <nil> service")
}
if service.driver == nil {
t.Fatal("unexpected <nil> driver")
}
if cfg.Server.HealthCheckType != "custom" {
t.Errorf("Did not set custom healthcheck. Want %q. Got %q", "custom", cfg.Server.HealthCheckType)
}
if service.logger != logger {
t.Errorf("wrong logger\nwant %#v\ngot %#v", logger, service.logger)
}
}
func TestNewDistributionServiceAsperaError(t *testing.T) {
service, err := NewDistributionService(&config.Config{Aspera: &config.Aspera{}}, nil)
if service != nil {
t.Errorf("Got unexpected non-nil service instance: %#v", service)
}
if err == nil {
t.Error("Got unexpected nil error")
}
}
func TestNewDistributionServiceWithAkamaiPurger(t *testing.T) {
cfg := config.Config{
Aspera: &config.Aspera{
SSHKeyFile: "/root/.ssh/id_rsa.pub",
SSHPort: 22,
SSHServer: "localhost",
SSHUser: "me",
},
Akamai: &config.Akamai{
Username: "[email protected]",
Password: "123456",
},
Server: &server.Config{},
}
service, err := NewDistributionService(&cfg, logrus.New())
if err != nil {
t.Fatal(err)
}
if service == nil {
t.Fatal("unexpected <nil> service")
}
if service.purger == nil {
t.Fatal("unexpected <nil> purger")
}
if cfg.Server.HealthCheckType != "custom" {
t.Errorf("Did not set custom healthcheck. Want %q. Got %q", "custom", cfg.Server.HealthCheckType)
}
}
func TestNewDistributionServiceErrorInAkamaiPurgerConfig(t *testing.T) {
cfg := config.Config{
Aspera: &config.Aspera{
SSHKeyFile: "/root/.ssh/id_rsa.pub",
SSHPort: 22,
SSHServer: "localhost",
SSHUser: "me",
},
Akamai: &config.Akamai{
Username: "[email protected]",
},
Server: &server.Config{},
}
service, err := NewDistributionService(&cfg, nil)
if service != nil {
t.Errorf("got unexpected non-nil service")
}
if err == nil {
t.Error("got unexpected <nil> error")
}
}
func TestTransfer(t *testing.T) {
var tests = []struct {
testCase string
reqBody string
bucketPath string
prepErr map[string]error
code int
respBody map[string]interface{}
transfers []fakeTransfer
fileList []string
finalState db.State
}{
{
"new transfer",
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`,
"",
nil,
http.StatusOK,
map[string]interface{}{
"transferId": "",
"state": "started",
"started": "",
"finished": "0001-01-01T00:00:00Z",
"fileList": []interface{}{},
},
[]fakeTransfer{
{
dst: "/public",
src: []string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"},
},
},
[]string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"},
db.StateFinished,
},
{
"new transfer with bucket override",
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`,
"/mnt/mybucket",
nil,
http.StatusOK,
map[string]interface{}{
"transferId": "",
"state": "started",
"started": "",
"finished": "0001-01-01T00:00:00Z",
"fileList": []interface{}{},
},
[]fakeTransfer{
{
dst: "/public",
src: []string{"/mnt/mybucket/folder/video_720p.mp4", "/mnt/mybucket/folder/video_1080p.mp4"},
},
},
[]string{"/mnt/mybucket/folder/video_720p.mp4", "/mnt/mybucket/folder/video_1080p.mp4"},
db.StateFinished,
},
{
"new transfer with bucket override - verbose format",
`{"src":["s3://mybucket.s3.amazonaws.com/folder/video_720p.mp4", "s3://mybucket.s3.amazonaws.com/folder/video_1080p.mp4"],"dst":"/public"}`,
"/mnt/bucketpath",
nil,
http.StatusOK,
map[string]interface{}{
"transferId": "",
"state": "started",
"started": "",
"finished": "0001-01-01T00:00:00Z",
"fileList": []interface{}{},
},
[]fakeTransfer{
{
dst: "/public",
src: []string{"/mnt/bucketpath/folder/video_720p.mp4", "/mnt/bucketpath/folder/video_1080p.mp4"},
},
},
[]string{"/mnt/bucketpath/folder/video_720p.mp4", "/mnt/bucketpath/folder/video_1080p.mp4"},
db.StateFinished,
},
{
"new transfer error in driver",
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`,
"",
map[string]error{"Transfer": errors.New("something went wrong")},
http.StatusOK,
map[string]interface{}{
"transferId": "",
"state": "started",
"started": "",
"finished": "0001-01-01T00:00:00Z",
"fileList": []interface{}{},
},
[]fakeTransfer{
{
dst: "/public",
src: []string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"},
},
},
nil,
db.StateFailed,
},
{
"new transfer internal error in driver",
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`,
"",
map[string]error{"Transfer": &driver.InternalError{Err: errors.New("something went wrong")}},
http.StatusOK,
map[string]interface{}{
"transferId": "",
"state": "started",
"started": "",
"finished": "0001-01-01T00:00:00Z",
"fileList": []interface{}{},
},
[]fakeTransfer{
{
dst: "/public",
src: []string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"},
},
},
nil,
db.StateFailed,
},
{
"new transfer missing dst",
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"]}`,
"",
nil,
http.StatusBadRequest,
map[string]interface{}{"error": "missing dst from request"},
nil,
nil,
db.StateFailed,
},
{
"new transfer missing src",
`{"src":null,"dst":"/public"}`,
"",
nil,
http.StatusBadRequest,
map[string]interface{}{"error": "missing src from request"},
nil,
nil,
db.StateFailed,
},
{
"new transfer invalid body",
`src=s3://mybucket/file.mp4&dst=/public`,
"",
nil,
http.StatusBadRequest,
map[string]interface{}{
"error": "could not parse json input: invalid character 's' looking for beginning of value",
},
nil,
nil,
db.StateFailed,
},
}
for _, test := range tests {
srvr := server.NewSimpleServer(nil)
repo := memory.NewMemoryRepository()
service := &DistributionService{
config: &config.Config{
Server: &server.Config{RouterType: "fast"},
Aspera: &config.Aspera{BucketLocalPath: test.bucketPath},
},
repo: repo,
logger: logrus.New(),
}
var calls int64
driver := fakeDriver{
callback: func(string, []string) {
atomic.AddInt64(&calls, 1)
},
}
abort := make(chan struct{})
transferFinished := make(chan struct{}, len(test.transfers))
go func() {
for atomic.LoadInt64(&calls) != int64(len(test.transfers)) {
select {
case <-abort:
return
default:
runtime.Gosched()
}
}
close(transferFinished)
}()
for op, pErr := range test.prepErr {
driver.PrepareError(op, pErr)
}
service.driver = &driver
srvr.Register(service)
r, _ := http.NewRequest("POST", "/transfers", strings.NewReader(test.reqBody))
r.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
srvr.ServeHTTP(w, r)
if w.Code != test.code {
t.Errorf("%s: wrong status code. Want %d. Got %d", test.testCase, test.code, w.Code)
}
var gotBody map[string]interface{}
err := json.NewDecoder(w.Body).Decode(&gotBody)
if err != nil {
t.Errorf("%s: %s", test.testCase, err)
}
var transferID string
if _, ok := test.respBody["transferId"]; ok {
transferID = gotBody["transferId"].(string)
transfer, err := repo.GetTransfer(transferID)
if err != nil {
t.Fatal(err)
}
test.respBody["transferId"] = transfer.ID
started, _ := json.Marshal(transfer.Started)
test.respBody["started"] = strings.Trim(string(started), `"`)
}
if !reflect.DeepEqual(gotBody, test.respBody) {
t.Errorf("%s: wrong body\nWant %#v\nGot %#v", test.testCase, test.respBody, gotBody)
}
select {
case <-transferFinished:
case <-time.After(2 * time.Second):
close(abort)
t.Errorf("failed to finish transfers in 2 seconds")
}
if !reflect.DeepEqual(driver.transfers, test.transfers) {
t.Errorf("%s: wrong transfers.\nWant %#v\nGot %#v", test.testCase, test.transfers, driver.transfers)
}
if transferID != "" {
transfer, err := repo.GetTransfer(transferID)
// wait for the database to update
for i := 1; i < 3 && err == nil && transfer.State != test.finalState; i++ {
time.Sleep(10 * time.Millisecond)
transfer, err = repo.GetTransfer(transferID)
}
if err != nil {
t.Fatal(err)
}
if transfer.State != test.finalState {
t.Errorf("%s: wrong state. Want %q. Got %q", test.testCase, test.finalState, transfer.State)
}
if !reflect.DeepEqual(transfer.FileList, test.fileList) {
t.Errorf("%s: wrong file list\nWant %#v\nGot %#v", test.testCase, test.fileList, transfer.FileList)
}
if transfer.Finished.IsZero() {
t.Errorf("%s: unexpected zero time in transfer.Finished", test.testCase)
}
}
}
}
func TestTransferWithPurger(t *testing.T) {
srvr := server.NewSimpleServer(nil)
repo := memory.NewMemoryRepository()
service := &DistributionService{
config: &config.Config{
Server: &server.Config{RouterType: "fast"},
Aspera: &config.Aspera{
BucketLocalPath: "/mnt/bucket",
},
},
repo: repo,
logger: logrus.New(),
}
var purged struct {
urls []string
sync.Mutex
}
urlsChan := make(chan string, 10)
exit := make(chan struct{}, 1)
var driver fakeDriver
purger := fakePurger{
hook: func(url string) {
urlsChan <- url
},
}
go func() {
defer close(exit)
for url := range urlsChan {
purged.Lock()
purged.urls = append(purged.urls, url)
if len(purged.urls) == 3 {
purged.Unlock()
break
}
purged.Unlock()
}
}()
service.driver = &driver
service.purger = &purger
srvr.Register(service)
reqData := `{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4", "s3://mybucket/folder/subfolder1", "s3://mybucket/anotherfolder/video_720p.mp4", "s3://mybucket/anotherfolder/subfolder", "s3://mybucket/anotherfolder2"],"dst":"/public"}`
r, _ := http.NewRequest("POST", "/transfers", strings.NewReader(reqData))
r.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
srvr.ServeHTTP(w, r)
if w.Code != http.StatusOK {
t.Errorf("wrong status code. Want %d. Got %d", http.StatusOK, w.Code)
}
select {
case <-exit:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for purger to finish")
}
expectedURLs := []string{"/mnt/bucket/anotherfolder/", "/mnt/bucket/anotherfolder2/", "/mnt/bucket/folder/"}
if !reflect.DeepEqual(purged.urls, expectedURLs) {
t.Errorf("wrong URLs purged\nWant %#v\nGot %#v", expectedURLs, purged.urls)
}
}
func TestGetTransfer(t *testing.T) {
repo := memory.NewMemoryRepository()
startedTime := time.Now().Truncate(time.Millisecond).UTC().Add(-time.Hour)
finishedTime := startedTime.Add(3 * time.Minute)
err := repo.CreateTransfer(&db.Transfer{
ID: "some-transfer",
Destination: "/123456/video",
Source: []string{"s3://mybucket/abc-123/video1.mp4", "s3://mybucket/abc-123/video2.mp4"},
FileList: []string{"/123456/video/video1.mp4", "/123456/video/video2.mp4"},
Started: startedTime,
Finished: finishedTime,
State: db.StateFinished,
})
if err != nil {
t.Fatal(err)
}
marshaledStarted, _ := json.Marshal(startedTime)
marshaledFinished, _ := json.Marshal(finishedTime)
var tests = []struct {
testCase string
transferID string
code int
respBody map[string]interface{}
}{
{
"existing transfer",
"some-transfer",
http.StatusOK,
map[string]interface{}{
"transferId": "some-transfer",
"fileList": []interface{}{"/123456/video/video1.mp4", "/123456/video/video2.mp4"},
"started": strings.Trim(string(marshaledStarted), `"`),
"finished": strings.Trim(string(marshaledFinished), `"`),
"state": string(db.StateFinished),
},
},
{
"transfer not found",
"transfer-that-does-not-exist",
http.StatusNotFound,
map[string]interface{}{"error": "transfer not found"},
},
}
for _, test := range tests {
serverCfg := server.Config{RouterType: "fast"}
srvr := server.NewSimpleServer(&serverCfg)
service := &DistributionService{
config: &config.Config{Server: &serverCfg},
repo: repo,
logger: logrus.New(),
}
srvr.Register(service)
r, _ := http.NewRequest("GET", "/transfers/"+test.transferID, nil)
w := httptest.NewRecorder()
srvr.ServeHTTP(w, r)
if w.Code != test.code {
t.Errorf("%s: wrong status code. Want %d. Got %d", test.testCase, test.code, w.Code)
}
var gotBody map[string]interface{}
err = json.Unmarshal(w.Body.Bytes(), &gotBody)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotBody, test.respBody) {
t.Errorf("wrong response body\nWant %#v\nGot %#v", test.respBody, gotBody)
}
}
}
func TestHealthcheck(t *testing.T) {
var tests = []struct {
testCase string
mountPoint string
prepErr map[string]error
code int
respBodyRegexp string
}{
{
"success healthcheck",
"",
nil,
http.StatusOK,
"ok-" + server.Name,
},
{
"success healthcheck with ok mount point",
"testdata/bucket",
nil,
http.StatusOK,
"ok-" + server.Name,
},
{
"failure healthcheck (driver error)",
"",
map[string]error{
"Healthcheck": errors.New("something went wrong"),
},
http.StatusInternalServerError,
"^something went wrong$",
},
{
"failure healthcheck (non-existing mount point)",
"testdata/path/that/does-not/exist",
nil,
http.StatusInternalServerError,
"^issue with bucket local path: open testdata/path/that/does-not/exist: no such file or directory$",
},
{
"failure healthcheck (non-dir mount point)",
"testdata/bucket/folder1/fake_video.mp4",
nil,
http.StatusInternalServerError,
"^issue with bucket local path: readdirent: .*$",
},
}
for _, test := range tests {
var driver fakeDriver
for op, pErr := range test.prepErr {
driver.PrepareError(op, pErr)
}
service := DistributionService{
config: &config.Config{
Server: &server.Config{RouterType: "fast"},
Aspera: &config.Aspera{BucketLocalPath: test.mountPoint},
},
logger: logrus.New(),
}
srvr := server.NewSimpleServer(&server.Config{
CustomHealthCheckHandler: http.HandlerFunc(service.healthcheck),
HealthCheckType: "custom",
})
service.driver = &driver
srvr.Register(&service)
// Start and stop to run Gizmo's healthcheck registration process.
srvr.Start()
srvr.Stop()
r, _ := http.NewRequest("GET", "/status.txt", nil)
w := httptest.NewRecorder()
srvr.ServeHTTP(w, r)
if w.Code != test.code {
t.Errorf("%s: wrong status code. Want %d. Got %d", test.testCase, test.code, w.Code)
}
bodyRegexp := regexp.MustCompile(test.respBodyRegexp)
if !bodyRegexp.MatchString(strings.TrimRight(w.Body.String(), "\n")) {
t.Errorf("%s: wrong body.\nRegexp %s\nGot %s", test.testCase, bodyRegexp, w.Body.String())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment