Created
December 14, 2016 21:19
-
-
Save flavioribeiro/42be4b639614125207101622e2561c38 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package service | |
import ( | |
"encoding/json" | |
"errors" | |
"net/http" | |
"net/http/httptest" | |
"reflect" | |
"regexp" | |
"runtime" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"testing" | |
"time" | |
"github.com/NYTimes/gizmo/server" | |
"github.com/Sirupsen/logrus" | |
"github.com/nytm/video-distribution-api/config" | |
"github.com/nytm/video-distribution-api/db" | |
"github.com/nytm/video-distribution-api/db/memory" | |
"github.com/nytm/video-distribution-api/driver" | |
) | |
func TestNewDistributionService(t *testing.T) { | |
cfg := config.Config{ | |
Aspera: &config.Aspera{ | |
SSHKeyFile: "/root/.ssh/id_rsa.pub", | |
SSHPort: 22, | |
SSHServer: "localhost", | |
SSHUser: "me", | |
}, | |
Server: &server.Config{}, | |
} | |
logger := logrus.New() | |
service, err := NewDistributionService(&cfg, logger) | |
if err != nil { | |
t.Fatal(err) | |
} | |
if service == nil { | |
t.Fatal("unexpected <nil> service") | |
} | |
if service.driver == nil { | |
t.Fatal("unexpected <nil> driver") | |
} | |
if cfg.Server.HealthCheckType != "custom" { | |
t.Errorf("Did not set custom healthcheck. Want %q. Got %q", "custom", cfg.Server.HealthCheckType) | |
} | |
if service.logger != logger { | |
t.Errorf("wrong logger\nwant %#v\ngot %#v", logger, service.logger) | |
} | |
} | |
func TestNewDistributionServiceAsperaError(t *testing.T) { | |
service, err := NewDistributionService(&config.Config{Aspera: &config.Aspera{}}, nil) | |
if service != nil { | |
t.Errorf("Got unexpected non-nil service instance: %#v", service) | |
} | |
if err == nil { | |
t.Error("Got unexpected nil error") | |
} | |
} | |
func TestNewDistributionServiceWithAkamaiPurger(t *testing.T) { | |
cfg := config.Config{ | |
Aspera: &config.Aspera{ | |
SSHKeyFile: "/root/.ssh/id_rsa.pub", | |
SSHPort: 22, | |
SSHServer: "localhost", | |
SSHUser: "me", | |
}, | |
Akamai: &config.Akamai{ | |
Username: "[email protected]", | |
Password: "123456", | |
}, | |
Server: &server.Config{}, | |
} | |
service, err := NewDistributionService(&cfg, logrus.New()) | |
if err != nil { | |
t.Fatal(err) | |
} | |
if service == nil { | |
t.Fatal("unexpected <nil> service") | |
} | |
if service.purger == nil { | |
t.Fatal("unexpected <nil> purger") | |
} | |
if cfg.Server.HealthCheckType != "custom" { | |
t.Errorf("Did not set custom healthcheck. Want %q. Got %q", "custom", cfg.Server.HealthCheckType) | |
} | |
} | |
func TestNewDistributionServiceErrorInAkamaiPurgerConfig(t *testing.T) { | |
cfg := config.Config{ | |
Aspera: &config.Aspera{ | |
SSHKeyFile: "/root/.ssh/id_rsa.pub", | |
SSHPort: 22, | |
SSHServer: "localhost", | |
SSHUser: "me", | |
}, | |
Akamai: &config.Akamai{ | |
Username: "[email protected]", | |
}, | |
Server: &server.Config{}, | |
} | |
service, err := NewDistributionService(&cfg, nil) | |
if service != nil { | |
t.Errorf("got unexpected non-nil service") | |
} | |
if err == nil { | |
t.Error("got unexpected <nil> error") | |
} | |
} | |
func TestTransfer(t *testing.T) { | |
var tests = []struct { | |
testCase string | |
reqBody string | |
bucketPath string | |
prepErr map[string]error | |
code int | |
respBody map[string]interface{} | |
transfers []fakeTransfer | |
fileList []string | |
finalState db.State | |
}{ | |
{ | |
"new transfer", | |
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`, | |
"", | |
nil, | |
http.StatusOK, | |
map[string]interface{}{ | |
"transferId": "", | |
"state": "started", | |
"started": "", | |
"finished": "0001-01-01T00:00:00Z", | |
"fileList": []interface{}{}, | |
}, | |
[]fakeTransfer{ | |
{ | |
dst: "/public", | |
src: []string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"}, | |
}, | |
}, | |
[]string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"}, | |
db.StateFinished, | |
}, | |
{ | |
"new transfer with bucket override", | |
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`, | |
"/mnt/mybucket", | |
nil, | |
http.StatusOK, | |
map[string]interface{}{ | |
"transferId": "", | |
"state": "started", | |
"started": "", | |
"finished": "0001-01-01T00:00:00Z", | |
"fileList": []interface{}{}, | |
}, | |
[]fakeTransfer{ | |
{ | |
dst: "/public", | |
src: []string{"/mnt/mybucket/folder/video_720p.mp4", "/mnt/mybucket/folder/video_1080p.mp4"}, | |
}, | |
}, | |
[]string{"/mnt/mybucket/folder/video_720p.mp4", "/mnt/mybucket/folder/video_1080p.mp4"}, | |
db.StateFinished, | |
}, | |
{ | |
"new transfer with bucket override - verbose format", | |
`{"src":["s3://mybucket.s3.amazonaws.com/folder/video_720p.mp4", "s3://mybucket.s3.amazonaws.com/folder/video_1080p.mp4"],"dst":"/public"}`, | |
"/mnt/bucketpath", | |
nil, | |
http.StatusOK, | |
map[string]interface{}{ | |
"transferId": "", | |
"state": "started", | |
"started": "", | |
"finished": "0001-01-01T00:00:00Z", | |
"fileList": []interface{}{}, | |
}, | |
[]fakeTransfer{ | |
{ | |
dst: "/public", | |
src: []string{"/mnt/bucketpath/folder/video_720p.mp4", "/mnt/bucketpath/folder/video_1080p.mp4"}, | |
}, | |
}, | |
[]string{"/mnt/bucketpath/folder/video_720p.mp4", "/mnt/bucketpath/folder/video_1080p.mp4"}, | |
db.StateFinished, | |
}, | |
{ | |
"new transfer error in driver", | |
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`, | |
"", | |
map[string]error{"Transfer": errors.New("something went wrong")}, | |
http.StatusOK, | |
map[string]interface{}{ | |
"transferId": "", | |
"state": "started", | |
"started": "", | |
"finished": "0001-01-01T00:00:00Z", | |
"fileList": []interface{}{}, | |
}, | |
[]fakeTransfer{ | |
{ | |
dst: "/public", | |
src: []string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"}, | |
}, | |
}, | |
nil, | |
db.StateFailed, | |
}, | |
{ | |
"new transfer internal error in driver", | |
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"],"dst":"/public"}`, | |
"", | |
map[string]error{"Transfer": &driver.InternalError{Err: errors.New("something went wrong")}}, | |
http.StatusOK, | |
map[string]interface{}{ | |
"transferId": "", | |
"state": "started", | |
"started": "", | |
"finished": "0001-01-01T00:00:00Z", | |
"fileList": []interface{}{}, | |
}, | |
[]fakeTransfer{ | |
{ | |
dst: "/public", | |
src: []string{"s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"}, | |
}, | |
}, | |
nil, | |
db.StateFailed, | |
}, | |
{ | |
"new transfer missing dst", | |
`{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4"]}`, | |
"", | |
nil, | |
http.StatusBadRequest, | |
map[string]interface{}{"error": "missing dst from request"}, | |
nil, | |
nil, | |
db.StateFailed, | |
}, | |
{ | |
"new transfer missing src", | |
`{"src":null,"dst":"/public"}`, | |
"", | |
nil, | |
http.StatusBadRequest, | |
map[string]interface{}{"error": "missing src from request"}, | |
nil, | |
nil, | |
db.StateFailed, | |
}, | |
{ | |
"new transfer invalid body", | |
`src=s3://mybucket/file.mp4&dst=/public`, | |
"", | |
nil, | |
http.StatusBadRequest, | |
map[string]interface{}{ | |
"error": "could not parse json input: invalid character 's' looking for beginning of value", | |
}, | |
nil, | |
nil, | |
db.StateFailed, | |
}, | |
} | |
for _, test := range tests { | |
srvr := server.NewSimpleServer(nil) | |
repo := memory.NewMemoryRepository() | |
service := &DistributionService{ | |
config: &config.Config{ | |
Server: &server.Config{RouterType: "fast"}, | |
Aspera: &config.Aspera{BucketLocalPath: test.bucketPath}, | |
}, | |
repo: repo, | |
logger: logrus.New(), | |
} | |
var calls int64 | |
driver := fakeDriver{ | |
callback: func(string, []string) { | |
atomic.AddInt64(&calls, 1) | |
}, | |
} | |
abort := make(chan struct{}) | |
transferFinished := make(chan struct{}, len(test.transfers)) | |
go func() { | |
for atomic.LoadInt64(&calls) != int64(len(test.transfers)) { | |
select { | |
case <-abort: | |
return | |
default: | |
runtime.Gosched() | |
} | |
} | |
close(transferFinished) | |
}() | |
for op, pErr := range test.prepErr { | |
driver.PrepareError(op, pErr) | |
} | |
service.driver = &driver | |
srvr.Register(service) | |
r, _ := http.NewRequest("POST", "/transfers", strings.NewReader(test.reqBody)) | |
r.Header.Set("Content-Type", "application/json") | |
w := httptest.NewRecorder() | |
srvr.ServeHTTP(w, r) | |
if w.Code != test.code { | |
t.Errorf("%s: wrong status code. Want %d. Got %d", test.testCase, test.code, w.Code) | |
} | |
var gotBody map[string]interface{} | |
err := json.NewDecoder(w.Body).Decode(&gotBody) | |
if err != nil { | |
t.Errorf("%s: %s", test.testCase, err) | |
} | |
var transferID string | |
if _, ok := test.respBody["transferId"]; ok { | |
transferID = gotBody["transferId"].(string) | |
transfer, err := repo.GetTransfer(transferID) | |
if err != nil { | |
t.Fatal(err) | |
} | |
test.respBody["transferId"] = transfer.ID | |
started, _ := json.Marshal(transfer.Started) | |
test.respBody["started"] = strings.Trim(string(started), `"`) | |
} | |
if !reflect.DeepEqual(gotBody, test.respBody) { | |
t.Errorf("%s: wrong body\nWant %#v\nGot %#v", test.testCase, test.respBody, gotBody) | |
} | |
select { | |
case <-transferFinished: | |
case <-time.After(2 * time.Second): | |
close(abort) | |
t.Errorf("failed to finish transfers in 2 seconds") | |
} | |
if !reflect.DeepEqual(driver.transfers, test.transfers) { | |
t.Errorf("%s: wrong transfers.\nWant %#v\nGot %#v", test.testCase, test.transfers, driver.transfers) | |
} | |
if transferID != "" { | |
transfer, err := repo.GetTransfer(transferID) | |
// wait for the database to update | |
for i := 1; i < 3 && err == nil && transfer.State != test.finalState; i++ { | |
time.Sleep(10 * time.Millisecond) | |
transfer, err = repo.GetTransfer(transferID) | |
} | |
if err != nil { | |
t.Fatal(err) | |
} | |
if transfer.State != test.finalState { | |
t.Errorf("%s: wrong state. Want %q. Got %q", test.testCase, test.finalState, transfer.State) | |
} | |
if !reflect.DeepEqual(transfer.FileList, test.fileList) { | |
t.Errorf("%s: wrong file list\nWant %#v\nGot %#v", test.testCase, test.fileList, transfer.FileList) | |
} | |
if transfer.Finished.IsZero() { | |
t.Errorf("%s: unexpected zero time in transfer.Finished", test.testCase) | |
} | |
} | |
} | |
} | |
func TestTransferWithPurger(t *testing.T) { | |
srvr := server.NewSimpleServer(nil) | |
repo := memory.NewMemoryRepository() | |
service := &DistributionService{ | |
config: &config.Config{ | |
Server: &server.Config{RouterType: "fast"}, | |
Aspera: &config.Aspera{ | |
BucketLocalPath: "/mnt/bucket", | |
}, | |
}, | |
repo: repo, | |
logger: logrus.New(), | |
} | |
var purged struct { | |
urls []string | |
sync.Mutex | |
} | |
urlsChan := make(chan string, 10) | |
exit := make(chan struct{}, 1) | |
var driver fakeDriver | |
purger := fakePurger{ | |
hook: func(url string) { | |
urlsChan <- url | |
}, | |
} | |
go func() { | |
defer close(exit) | |
for url := range urlsChan { | |
purged.Lock() | |
purged.urls = append(purged.urls, url) | |
if len(purged.urls) == 3 { | |
purged.Unlock() | |
break | |
} | |
purged.Unlock() | |
} | |
}() | |
service.driver = &driver | |
service.purger = &purger | |
srvr.Register(service) | |
reqData := `{"src":["s3://mybucket/folder/video_720p.mp4", "s3://mybucket/folder/video_1080p.mp4", "s3://mybucket/folder/subfolder1", "s3://mybucket/anotherfolder/video_720p.mp4", "s3://mybucket/anotherfolder/subfolder", "s3://mybucket/anotherfolder2"],"dst":"/public"}` | |
r, _ := http.NewRequest("POST", "/transfers", strings.NewReader(reqData)) | |
r.Header.Set("Content-Type", "application/json") | |
w := httptest.NewRecorder() | |
srvr.ServeHTTP(w, r) | |
if w.Code != http.StatusOK { | |
t.Errorf("wrong status code. Want %d. Got %d", http.StatusOK, w.Code) | |
} | |
select { | |
case <-exit: | |
case <-time.After(2 * time.Second): | |
t.Fatal("timed out waiting for purger to finish") | |
} | |
expectedURLs := []string{"/mnt/bucket/anotherfolder/", "/mnt/bucket/anotherfolder2/", "/mnt/bucket/folder/"} | |
if !reflect.DeepEqual(purged.urls, expectedURLs) { | |
t.Errorf("wrong URLs purged\nWant %#v\nGot %#v", expectedURLs, purged.urls) | |
} | |
} | |
func TestGetTransfer(t *testing.T) { | |
repo := memory.NewMemoryRepository() | |
startedTime := time.Now().Truncate(time.Millisecond).UTC().Add(-time.Hour) | |
finishedTime := startedTime.Add(3 * time.Minute) | |
err := repo.CreateTransfer(&db.Transfer{ | |
ID: "some-transfer", | |
Destination: "/123456/video", | |
Source: []string{"s3://mybucket/abc-123/video1.mp4", "s3://mybucket/abc-123/video2.mp4"}, | |
FileList: []string{"/123456/video/video1.mp4", "/123456/video/video2.mp4"}, | |
Started: startedTime, | |
Finished: finishedTime, | |
State: db.StateFinished, | |
}) | |
if err != nil { | |
t.Fatal(err) | |
} | |
marshaledStarted, _ := json.Marshal(startedTime) | |
marshaledFinished, _ := json.Marshal(finishedTime) | |
var tests = []struct { | |
testCase string | |
transferID string | |
code int | |
respBody map[string]interface{} | |
}{ | |
{ | |
"existing transfer", | |
"some-transfer", | |
http.StatusOK, | |
map[string]interface{}{ | |
"transferId": "some-transfer", | |
"fileList": []interface{}{"/123456/video/video1.mp4", "/123456/video/video2.mp4"}, | |
"started": strings.Trim(string(marshaledStarted), `"`), | |
"finished": strings.Trim(string(marshaledFinished), `"`), | |
"state": string(db.StateFinished), | |
}, | |
}, | |
{ | |
"transfer not found", | |
"transfer-that-does-not-exist", | |
http.StatusNotFound, | |
map[string]interface{}{"error": "transfer not found"}, | |
}, | |
} | |
for _, test := range tests { | |
serverCfg := server.Config{RouterType: "fast"} | |
srvr := server.NewSimpleServer(&serverCfg) | |
service := &DistributionService{ | |
config: &config.Config{Server: &serverCfg}, | |
repo: repo, | |
logger: logrus.New(), | |
} | |
srvr.Register(service) | |
r, _ := http.NewRequest("GET", "/transfers/"+test.transferID, nil) | |
w := httptest.NewRecorder() | |
srvr.ServeHTTP(w, r) | |
if w.Code != test.code { | |
t.Errorf("%s: wrong status code. Want %d. Got %d", test.testCase, test.code, w.Code) | |
} | |
var gotBody map[string]interface{} | |
err = json.Unmarshal(w.Body.Bytes(), &gotBody) | |
if err != nil { | |
t.Fatal(err) | |
} | |
if !reflect.DeepEqual(gotBody, test.respBody) { | |
t.Errorf("wrong response body\nWant %#v\nGot %#v", test.respBody, gotBody) | |
} | |
} | |
} | |
func TestHealthcheck(t *testing.T) { | |
var tests = []struct { | |
testCase string | |
mountPoint string | |
prepErr map[string]error | |
code int | |
respBodyRegexp string | |
}{ | |
{ | |
"success healthcheck", | |
"", | |
nil, | |
http.StatusOK, | |
"ok-" + server.Name, | |
}, | |
{ | |
"success healthcheck with ok mount point", | |
"testdata/bucket", | |
nil, | |
http.StatusOK, | |
"ok-" + server.Name, | |
}, | |
{ | |
"failure healthcheck (driver error)", | |
"", | |
map[string]error{ | |
"Healthcheck": errors.New("something went wrong"), | |
}, | |
http.StatusInternalServerError, | |
"^something went wrong$", | |
}, | |
{ | |
"failure healthcheck (non-existing mount point)", | |
"testdata/path/that/does-not/exist", | |
nil, | |
http.StatusInternalServerError, | |
"^issue with bucket local path: open testdata/path/that/does-not/exist: no such file or directory$", | |
}, | |
{ | |
"failure healthcheck (non-dir mount point)", | |
"testdata/bucket/folder1/fake_video.mp4", | |
nil, | |
http.StatusInternalServerError, | |
"^issue with bucket local path: readdirent: .*$", | |
}, | |
} | |
for _, test := range tests { | |
var driver fakeDriver | |
for op, pErr := range test.prepErr { | |
driver.PrepareError(op, pErr) | |
} | |
service := DistributionService{ | |
config: &config.Config{ | |
Server: &server.Config{RouterType: "fast"}, | |
Aspera: &config.Aspera{BucketLocalPath: test.mountPoint}, | |
}, | |
logger: logrus.New(), | |
} | |
srvr := server.NewSimpleServer(&server.Config{ | |
CustomHealthCheckHandler: http.HandlerFunc(service.healthcheck), | |
HealthCheckType: "custom", | |
}) | |
service.driver = &driver | |
srvr.Register(&service) | |
// Start and stop to run Gizmo's healthcheck registration process. | |
srvr.Start() | |
srvr.Stop() | |
r, _ := http.NewRequest("GET", "/status.txt", nil) | |
w := httptest.NewRecorder() | |
srvr.ServeHTTP(w, r) | |
if w.Code != test.code { | |
t.Errorf("%s: wrong status code. Want %d. Got %d", test.testCase, test.code, w.Code) | |
} | |
bodyRegexp := regexp.MustCompile(test.respBodyRegexp) | |
if !bodyRegexp.MatchString(strings.TrimRight(w.Body.String(), "\n")) { | |
t.Errorf("%s: wrong body.\nRegexp %s\nGot %s", test.testCase, bodyRegexp, w.Body.String()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment