Skip to content

Instantly share code, notes, and snippets.

@guzmonne
Last active July 10, 2019 03:44
Show Gist options
  • Save guzmonne/07da3cb470fde51851ec9bb13fcca5f0 to your computer and use it in GitHub Desktop.
Save guzmonne/07da3cb470fde51851ec9bb13fcca5f0 to your computer and use it in GitHub Desktop.
Web Server implementing queues with Go
package main
import (
"runtime"
"context"
_ "expvar"
"flag"
"fmt"
"sync/atomic"
"log"
"net/http"
_ "net/http/pprof"
"time"
"os"
"os/signal"
"github.com/json-iterator/go"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
// Location : Meraki device observation location
type Location struct {
Lat float64 `json:"lat,omitempty"`
Lng float64 `json:"lng,omitempty"`
Unc float64 `json:"unc,omitempty"`
X []float64 `json:"x,omitempty"`
Y []float64 `json:"y,omitempty"`
}
// Observation : Meraki device observation
type Observation struct {
IPv4 string `json:"ipv4,omitempty"`
Location Location `json:"location,omitempty"`
SeenTime string `json:"seenTime,omitempty"`
SSID string `json:"ssid,omitempty"`
OS string `json:"os,omitempty"`
ClientMac string `json:"clientMac,omitempty"`
SeenEpoch int64 `json:"seenEpoch,omitempty"`
RSSI int `json:"rssi,omitempty"`
IPv6 string `json:"ipv6,omitempty"`
Manufacturer string `json:"manufacturer,omitempty"`
}
// DevicesSeenData : Meraki devices seen data
type DevicesSeenData struct {
ApMac string `json:"apMac,omitempty"`
ApTags []string `json:"apTags,omitempty"`
ApFloors []string `json:"apFloors,omitempty"`
Observations []Observation `json:"observations,omitempty"`
}
// DevicesSeen : Meraki devices seen
type DevicesSeen struct {
Version string `json:"version,omitempty"`
Secret string `json:"secret,omitempty"`
Type string `json:"type,omitempty"`
Data DevicesSeenData `json:"data,omitempty"`
}
type job struct {
devicesSeen DevicesSeen
}
var (
healthy int32
logger *log.Logger
)
func main() {
var (
maxQueueSize = flag.Int("max_queue_size", 100, "The size of the job queue")
maxWorkers = flag.Int("max_workers", 5, "The number of workers to start")
port = flag.String("port", "8080", "The server port")
)
flag.Parse()
// Logger configuration
logger := log.New(os.Stdout, "http: ", log.LstdFlags)
logger.Println("Server is starting...")
// Configure concurrent jobs
runtime.GOMAXPROCS(runtime.NumCPU())
logger.Println("GOMAXPROCS =", runtime.NumCPU())
logger.Println("maxWorkers =", *maxWorkers)
logger.Println("maxQueueSize =", *maxQueueSize)
// Create job channel
jobs := make(chan job, *maxQueueSize)
// Create workers
for index := 1; index <= *maxWorkers; index++ {
logger.Println("Starting worker #", index)
go func(index int) {
for job := range jobs {
processBody(index, job)
}
}(index)
}
// Router configuration
router := http.NewServeMux()
router.Handle("/", handler(jobs))
router.Handle("/healthz", healthz())
// Server configuration
server := &http.Server{
Addr: "0.0.0.0:" + *port,
Handler: logging(logger)(router),
ErrorLog: logger,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}
// Configure done and quit handlers
done := make(chan bool)
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
// Launch quit handler goroutine
go func() {
<-quit
logger.Println("Server is shutting down...")
atomic.StoreInt32(&healthy, 0)
ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
defer cancel()
server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
logger.Fatalf("Could not gracefully shutdown the server: %v\n", err)
}
close(done)
}()
// Run server
logger.Println("Server is ready to handle requests at", "0.0.0.0:" + *port)
atomic.StoreInt32(&healthy, 1)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatalf("Could not listen on %s: %v\n", "0.0.0.0:" + *port, err)
}
// Handle done
<-done
logger.Println("Server stopped")
}
func healthz() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if atomic.LoadInt32(&healthy) == 1 {
w.WriteHeader(http.StatusNoContent)
return
}
w.WriteHeader(http.StatusServiceUnavailable)
})
}
func handler(jobs chan job) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
if r.Method == http.MethodGet {
validator(w, r)
return
}
if r.Method == http.MethodPost {
data(w, r, jobs)
return
}
})
}
func validator(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, "da6a17c407bb11dfeec7392a5042be0a4cc034b6")
}
func data(w http.ResponseWriter, r *http.Request, jobs chan job) {
var devicesSeen DevicesSeen
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&devicesSeen)
if err != nil {
panic(err)
http.Error(w, "Bad request - Can't Decode!", 400)
return
}
// Create Job and push the work into the Job Channel
go func() {
fmt.Printf("Added job\n")
jobs <- job{devicesSeen}
}()
// Render success
w.WriteHeader(http.StatusAccepted)
}
func processBody(id int, j job) {
time.Sleep(2 * time.Second)
fmt.Println("Done")
for _, observation := range j.devicesSeen.Data.Observations {
fmt.Println(observation.ClientMac)
}
}
func logging(logger *log.Logger) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
defer func() {
logger.Println(r.Method, r.URL.Path, r.RemoteAddr, r.UserAgent(), time.Since(start),)
}()
next.ServeHTTP(w, r)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment