Created
September 14, 2017 10:00
-
-
Save woodsaj/ab06ed4d7f894587ba33fbbca1875299 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/gorilla/handlers" | |
) | |
var ( | |
tr = &http.Transport{} | |
client = &http.Client{Transport: tr} | |
) | |
func main() { | |
r := http.NewServeMux() | |
r.HandleFunc("/getdata", getdata) | |
r.HandleFunc("/render", render) | |
log.Printf("listening on :1111") | |
http.ListenAndServe(":1111", handlers.CombinedLoggingHandler(os.Stdout, r)) | |
} | |
// expect a "id" param in the request. id is an int and we should | |
// wait this many seconds before returning the id back. | |
func getdata(w http.ResponseWriter, req *http.Request) { | |
id := req.FormValue("id") | |
wait, err := strconv.ParseInt(id, 10, 64) | |
if err != nil { | |
writeError(w, http.StatusInternalServerError, err.Error()) | |
return | |
} | |
log.Printf("backend: processing request with id: %s", id) | |
if wait == 2 { | |
writeError(w, http.StatusInternalServerError, "fake error") | |
return | |
} | |
ctx := req.Context() | |
// wait our specified duration or until the request context is cancled. | |
// this will happen when the client closes the connection before we send our response | |
select { | |
case <-time.After(time.Second * time.Duration(wait)): | |
w.Write([]byte(id)) | |
return | |
case <-ctx.Done(): | |
log.Printf("backend: request was canceled.") | |
writeError(w, 499, "request canceled") | |
return | |
} | |
} | |
func render(w http.ResponseWriter, req *http.Request) { | |
// generate a new context with a cancelFunc that we can call | |
reqContext, cancel := context.WithCancel(req.Context()) | |
resp := make([]byte, 0) | |
var wg sync.WaitGroup | |
// channel for our results from remote requests | |
c := make(chan struct { | |
data []byte | |
err error | |
}, 1) | |
// make 4 concurrent remote requests | |
for i := 1; i < 5; i++ { | |
wg.Add(1) | |
go func(i int) { | |
defer wg.Done() | |
// call our remote servers. We pass our context and cancelFunc | |
// if getRemote encounters an error it will call cancel() | |
// all other parallel getRemote() calls will then see that | |
// the context has been canceled and can abort. | |
data, err := getRemote(reqContext, cancel, i) | |
pack := struct { | |
data []byte | |
err error | |
}{[]byte(fmt.Sprintf("%d: %s\n", i, data)), err} | |
// push our response onto the channel | |
c <- pack | |
}(i) | |
} | |
// wait for all getRemote() calls to complete, then close our results channel | |
go func() { | |
wg.Wait() | |
close(c) | |
}() | |
// iterate over our results | |
for r := range c { | |
if r.err != nil { | |
log.Printf("frontend: work returned error. %s", r.err.Error()) | |
writeError(w, http.StatusInternalServerError, r.err.Error()) | |
return | |
} | |
resp = append(resp, r.data...) | |
} | |
w.Write(resp) | |
} | |
func getRemote(ctx context.Context, cancel context.CancelFunc, i int) ([]byte, error) { | |
req, _ := http.NewRequest("GET", fmt.Sprintf("http://localhost:1111/getdata?id=%d", i), nil) | |
// anonymous struct to pack and unpack data in the channel | |
c := make(chan struct { | |
r *http.Response | |
err error | |
}, 1) | |
// make our http request in a separte goroutine | |
go func() { | |
log.Printf("getRemote: Doing http request with id=%d", i) | |
resp, err := client.Do(req) | |
pack := struct { | |
r *http.Response | |
err error | |
}{resp, err} | |
c <- pack | |
}() | |
// wait for either our results from the http request or if out context has been canceled | |
// then abort the http request. | |
select { | |
case <-ctx.Done(): | |
log.Printf("getRemote: peer request was cancled. termintion job with id=%d", i) | |
tr.CancelRequest(req) | |
<-c // Wait for client.Do | |
return nil, nil | |
case ok := <-c: | |
err := ok.err | |
resp := ok.r | |
if err != nil { | |
log.Printf("getRemote: request id=%d failed. canceling peer requets.", i) | |
cancel() | |
return nil, err | |
} | |
defer resp.Body.Close() | |
out, _ := ioutil.ReadAll(resp.Body) | |
if resp.StatusCode >= 300 { | |
log.Printf("getRemote: job id=%d got non 200 response. %s", i, out) | |
return out, fmt.Errorf(resp.Status) | |
} | |
fmt.Printf("getRemote: job with id=%d got Response: %s\n", i, out) | |
return out, err | |
} | |
} | |
func writeError(w http.ResponseWriter, code int, err string) { | |
w.WriteHeader(code) | |
w.Write([]byte(err)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment