Last active
July 10, 2019 03:44
-
-
Save guzmonne/07da3cb470fde51851ec9bb13fcca5f0 to your computer and use it in GitHub Desktop.
Web Server implementing queues with Go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"runtime" | |
"context" | |
_ "expvar" | |
"flag" | |
"fmt" | |
"sync/atomic" | |
"log" | |
"net/http" | |
_ "net/http/pprof" | |
"time" | |
"os" | |
"os/signal" | |
"github.com/json-iterator/go" | |
) | |
var json = jsoniter.ConfigCompatibleWithStandardLibrary | |
// Location : Meraki device observation location | |
type Location struct { | |
Lat float64 `json:"lat,omitempty"` | |
Lng float64 `json:"lng,omitempty"` | |
Unc float64 `json:"unc,omitempty"` | |
X []float64 `json:"x,omitempty"` | |
Y []float64 `json:"y,omitempty"` | |
} | |
// Observation : Meraki device observation | |
type Observation struct { | |
IPv4 string `json:"ipv4,omitempty"` | |
Location Location `json:"location,omitempty"` | |
SeenTime string `json:"seenTime,omitempty"` | |
SSID string `json:"ssid,omitempty"` | |
OS string `json:"os,omitempty"` | |
ClientMac string `json:"clientMac,omitempty"` | |
SeenEpoch int64 `json:"seenEpoch,omitempty"` | |
RSSI int `json:"rssi,omitempty"` | |
IPv6 string `json:"ipv6,omitempty"` | |
Manufacturer string `json:"manufacturer,omitempty"` | |
} | |
// DevicesSeenData : Meraki devices seen data | |
type DevicesSeenData struct { | |
ApMac string `json:"apMac,omitempty"` | |
ApTags []string `json:"apTags,omitempty"` | |
ApFloors []string `json:"apFloors,omitempty"` | |
Observations []Observation `json:"observations,omitempty"` | |
} | |
// DevicesSeen : Meraki devices seen | |
type DevicesSeen struct { | |
Version string `json:"version,omitempty"` | |
Secret string `json:"secret,omitempty"` | |
Type string `json:"type,omitempty"` | |
Data DevicesSeenData `json:"data,omitempty"` | |
} | |
type job struct { | |
devicesSeen DevicesSeen | |
} | |
var ( | |
healthy int32 | |
logger *log.Logger | |
) | |
func main() { | |
var ( | |
maxQueueSize = flag.Int("max_queue_size", 100, "The size of the job queue") | |
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") | |
port = flag.String("port", "8080", "The server port") | |
) | |
flag.Parse() | |
// Logger configuration | |
logger := log.New(os.Stdout, "http: ", log.LstdFlags) | |
logger.Println("Server is starting...") | |
// Configure concurrent jobs | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
logger.Println("GOMAXPROCS =", runtime.NumCPU()) | |
logger.Println("maxWorkers =", *maxWorkers) | |
logger.Println("maxQueueSize =", *maxQueueSize) | |
// Create job channel | |
jobs := make(chan job, *maxQueueSize) | |
// Create workers | |
for index := 1; index <= *maxWorkers; index++ { | |
logger.Println("Starting worker #", index) | |
go func(index int) { | |
for job := range jobs { | |
processBody(index, job) | |
} | |
}(index) | |
} | |
// Router configuration | |
router := http.NewServeMux() | |
router.Handle("/", handler(jobs)) | |
router.Handle("/healthz", healthz()) | |
// Server configuration | |
server := &http.Server{ | |
Addr: "0.0.0.0:" + *port, | |
Handler: logging(logger)(router), | |
ErrorLog: logger, | |
ReadTimeout: 5 * time.Second, | |
WriteTimeout: 10 * time.Second, | |
IdleTimeout: 15 * time.Second, | |
} | |
// Configure done and quit handlers | |
done := make(chan bool) | |
quit := make(chan os.Signal, 1) | |
signal.Notify(quit, os.Interrupt) | |
// Launch quit handler goroutine | |
go func() { | |
<-quit | |
logger.Println("Server is shutting down...") | |
atomic.StoreInt32(&healthy, 0) | |
ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second) | |
defer cancel() | |
server.SetKeepAlivesEnabled(false) | |
if err := server.Shutdown(ctx); err != nil { | |
logger.Fatalf("Could not gracefully shutdown the server: %v\n", err) | |
} | |
close(done) | |
}() | |
// Run server | |
logger.Println("Server is ready to handle requests at", "0.0.0.0:" + *port) | |
atomic.StoreInt32(&healthy, 1) | |
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { | |
logger.Fatalf("Could not listen on %s: %v\n", "0.0.0.0:" + *port, err) | |
} | |
// Handle done | |
<-done | |
logger.Println("Server stopped") | |
} | |
func healthz() http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
if atomic.LoadInt32(&healthy) == 1 { | |
w.WriteHeader(http.StatusNoContent) | |
return | |
} | |
w.WriteHeader(http.StatusServiceUnavailable) | |
}) | |
} | |
func handler(jobs chan job) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodGet && r.Method != http.MethodPost { | |
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) | |
return | |
} | |
if r.Method == http.MethodGet { | |
validator(w, r) | |
return | |
} | |
if r.Method == http.MethodPost { | |
data(w, r, jobs) | |
return | |
} | |
}) | |
} | |
func validator(w http.ResponseWriter, r *http.Request) { | |
w.Header().Set("Content-Type", "text/plain; charset=utf-8") | |
w.Header().Set("X-Content-Type-Options", "nosniff") | |
w.WriteHeader(http.StatusOK) | |
fmt.Fprintln(w, "da6a17c407bb11dfeec7392a5042be0a4cc034b6") | |
} | |
func data(w http.ResponseWriter, r *http.Request, jobs chan job) { | |
var devicesSeen DevicesSeen | |
decoder := json.NewDecoder(r.Body) | |
err := decoder.Decode(&devicesSeen) | |
if err != nil { | |
panic(err) | |
http.Error(w, "Bad request - Can't Decode!", 400) | |
return | |
} | |
// Create Job and push the work into the Job Channel | |
go func() { | |
fmt.Printf("Added job\n") | |
jobs <- job{devicesSeen} | |
}() | |
// Render success | |
w.WriteHeader(http.StatusAccepted) | |
} | |
func processBody(id int, j job) { | |
time.Sleep(2 * time.Second) | |
fmt.Println("Done") | |
for _, observation := range j.devicesSeen.Data.Observations { | |
fmt.Println(observation.ClientMac) | |
} | |
} | |
func logging(logger *log.Logger) func(http.Handler) http.Handler { | |
return func(next http.Handler) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
start := time.Now() | |
defer func() { | |
logger.Println(r.Method, r.URL.Path, r.RemoteAddr, r.UserAgent(), time.Since(start),) | |
}() | |
next.ServeHTTP(w, r) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment