Skip to content

Instantly share code, notes, and snippets.

@zdunecki
Last active September 28, 2020 13:51
Show Gist options
  • Select an option

  • Save zdunecki/1f02e32dc23574849c4373cbe695585a to your computer and use it in GitHub Desktop.

Select an option

Save zdunecki/1f02e32dc23574849c4373cbe695585a to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gomodule/redigo/redis"
)
func newPool() *redis.Pool {
return &redis.Pool{
// Maximum number of idle connections in the pool.
MaxIdle: 80,
// max number of connections
MaxActive: 12000,
// Dial is an application supplied function for creating and
// configuring a connection.
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", ":6379")
if err != nil {
panic(err.Error())
}
return c, err
},
}
}
func main() {
fmt.Println("TOPIFY SCHEDULER")
pool := newPool()
conn := pool.Get()
defer conn.Close()
subconn := redis.PubSubConn{Conn: pool.Get()}
subconn.Subscribe("jobRequest")
forever := make(chan string)
go func() {
for {
switch v := subconn.Receive().(type) {
case redis.Message:
type Data struct {
RequestID string `json:"requestId"`
Pros [][]Pro `json:"pros"`
}
var data Data
err := json.Unmarshal(v.Data, &data)
if err != nil {
panic(err)
}
router(conn, pool.Get(), data.RequestID, data.Pros)
case error:
panic(v)
}
}
}()
<-forever
}
func dx(priority float32, b float32) int {
if b == 0 {
b = 60
}
return int(b - (b * priority))
}
type balancerData struct {
RequestID string `json:"requestId"`
OnlineProKeys []string `json:"onlineProKeys"`
PriorityPros []Pro `json:"prioritizedPros"`
}
type Pro struct {
ID string `json:"id"`
UserID string `json:"userId"`
Priority float32 `json:"priority"`
}
func router(conn redis.Conn, pub redis.Conn, requestID string, prioritizedPros [][]Pro) {
// ctx, _ := context.WithCancel(context.Background()) if we'll need cancel
ctx := context.Background()
for i, prioritizedPros := range prioritizedPros {
d := dx(prioritizedPros[0].Priority, 10)
var isAnyProOnline bool
//var onlineProKeysChan = make(chan interface{})
var onlineProKeys []string
for _, pro := range prioritizedPros {
proKey := "socket:topify:pros:" + pro.UserID
r, err := redis.Values(conn.Do("KEYS", proKey+":*"))
if err != nil {
panic(err)
}
if len(r) != 0 {
isAnyProOnline = true
}
ris, _ := redis.Strings(r, nil)
if len(ris) != 0 {
onlineProKeys = append(onlineProKeys, ris...)
}
}
var t time.Duration
if i == 0 || !isAnyProOnline {
t = 0
} else {
t = time.Duration(d)
}
data := balancerData{
RequestID: requestID,
OnlineProKeys: onlineProKeys,
PriorityPros: prioritizedPros,
}
go scheduler(
ctx,
pub,
data,
t,
)
}
}
func scheduler(
ctx context.Context,
pub redis.Conn,
balancerData balancerData,
t time.Duration,
) {
select {
case <-time.After(time.Second * t):
if balancerData.OnlineProKeys == nil {
fmt.Println("EMPTY")
return
}
data, err := json.Marshal(balancerData)
if err != nil {
panic(err)
}
fmt.Println("SEND")
pub.Send("PUBLISH", "newJob", data)
pub.Flush()
return
case <-ctx.Done():
fmt.Print("CTX END")
return
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment