Skip to content

Instantly share code, notes, and snippets.

@peterhellberg
Created January 16, 2018 15:02
Show Gist options
  • Save peterhellberg/83b6a1325f0966824d7486d7ce242a5e to your computer and use it in GitHub Desktop.
Save peterhellberg/83b6a1325f0966824d7486d7ce242a5e to your computer and use it in GitHub Desktop.
A pretty minimal work queue
package main
import (
"log"
"net/http"
"os"
"time"
)
func main() {
logger := log.New(os.Stdout, "", 0)
s := NewServer(
Log(logger),
Queue(make(chan func(), 100)),
)
go s.Work()
hs := &http.Server{Addr: "0.0.0.0:2018", Handler: s}
logger.Printf("Listening on http://%s\n", hs.Addr)
hs.ListenAndServe()
}
type Logger interface {
Printf(format string, v ...interface{})
}
type Server struct {
logger Logger
handler http.Handler
queue chan func()
}
type Option func(*Server)
func Log(l Logger) Option {
return func(s *Server) {
s.logger = l
}
}
func Queue(q chan func()) Option {
return func(s *Server) {
s.queue = q
}
}
func NewServer(options ...Option) *Server {
s := &Server{
logger: log.New(os.Stdout, "", 0),
queue: make(chan func(), 10),
}
for _, f := range options {
f(s)
}
s.register(http.NewServeMux())
return s
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.logger.Printf("%s %q count#requests=1\n", r.Method, r.URL.Path)
s.handler.ServeHTTP(w, r)
}
func (s *Server) Work() {
for job := range s.queue {
job()
}
}
func (s *Server) index(w http.ResponseWriter, r *http.Request) {
jobID := time.Now().Unix()
s.queue <- func() {
s.log("Job %d: Started", jobID)
s.log("Job %d: Sleeping", jobID)
time.Sleep(2 * time.Second)
s.log("Job %d: Finished", jobID)
}
s.log("Job %d: Enqueued", jobID)
w.WriteHeader(http.StatusNoContent)
}
func (s *Server) log(format string, v ...interface{}) {
s.logger.Printf(format+"\n", v...)
}
func (s *Server) register(mux *http.ServeMux) {
mux.HandleFunc("/", s.index)
s.handler = mux
}
@peterhellberg
Copy link
Author

Use some globally unique id generator (xid, ulid, etc.)

package main

import (
	"log"
	"net/http"
	"os"
	"time"

	"github.com/rs/xid"
)

func main() {
	s := NewServer(
		WithLogger(log.New(os.Stdout, "", 0)),
		WithQueue(make(chan Task, 100)),
	)

	go s.Work()

	hs := &http.Server{Addr: "0.0.0.0:2019", Handler: s}

	s.log("Listening on http://%s", hs.Addr)

	hs.ListenAndServe()
}

type Task struct {
	ID  string
	Run func() error
}

type Logger interface {
	Printf(format string, v ...interface{})
}

type Server struct {
	logger  Logger
	handler http.Handler
	queue   chan Task
}

type Option func(*Server)

func WithLogger(l Logger) Option {
	return func(s *Server) {
		s.logger = l
	}
}

func WithQueue(q chan Task) Option {
	return func(s *Server) {
		s.queue = q
	}
}

func NewServer(options ...Option) *Server {
	s := &Server{
		logger: log.New(os.Stdout, "", 0),
		queue:  make(chan Task, 100),
	}

	for _, f := range options {
		f(s)
	}

	s.register(http.NewServeMux())

	return s
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	s.log("%s %q count#requests=1", r.Method, r.URL.Path)
	s.handler.ServeHTTP(w, r)
}

func (s *Server) Work() {
	for task := range s.queue {
		s.log("[%s] Started task", task.ID)

		if err := task.Run(); err != nil {
			s.log("[%s] Error %q", task.ID, err)
		} else {
			s.log("[%s] Finished task", task.ID)
		}
	}
}

func (s *Server) enqueueIndexTask() {
	// Use some globally unique id generator (xid, ulid, etc.)
	id := xid.New().String()

	s.queue <- Task{id, func() error {
		s.log("[%s] Sleeping for 4s", id)

		time.Sleep(4 * time.Second)

		return nil
	}}

	s.log("[%s] Enqueued task", id)
}

func (s *Server) index(w http.ResponseWriter, r *http.Request) {
	s.enqueueIndexTask()

	w.WriteHeader(http.StatusNoContent)
}

func (s *Server) register(mux *http.ServeMux) {
	mux.HandleFunc("/", s.index)

	s.handler = mux
}

func (s *Server) log(format string, v ...interface{}) {
	s.logger.Printf(format+"\n", v...)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment