Last active
September 28, 2020 13:51
-
-
Save zdunecki/1f02e32dc23574849c4373cbe695585a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "encoding/json" | |
| "fmt" | |
| "time" | |
| "github.com/gomodule/redigo/redis" | |
| ) | |
| func newPool() *redis.Pool { | |
| return &redis.Pool{ | |
| // Maximum number of idle connections in the pool. | |
| MaxIdle: 80, | |
| // max number of connections | |
| MaxActive: 12000, | |
| // Dial is an application supplied function for creating and | |
| // configuring a connection. | |
| Dial: func() (redis.Conn, error) { | |
| c, err := redis.Dial("tcp", ":6379") | |
| if err != nil { | |
| panic(err.Error()) | |
| } | |
| return c, err | |
| }, | |
| } | |
| } | |
| func main() { | |
| fmt.Println("TOPIFY SCHEDULER") | |
| pool := newPool() | |
| conn := pool.Get() | |
| defer conn.Close() | |
| subconn := redis.PubSubConn{Conn: pool.Get()} | |
| subconn.Subscribe("jobRequest") | |
| forever := make(chan string) | |
| go func() { | |
| for { | |
| switch v := subconn.Receive().(type) { | |
| case redis.Message: | |
| type Data struct { | |
| RequestID string `json:"requestId"` | |
| Pros [][]Pro `json:"pros"` | |
| } | |
| var data Data | |
| err := json.Unmarshal(v.Data, &data) | |
| if err != nil { | |
| panic(err) | |
| } | |
| router(conn, pool.Get(), data.RequestID, data.Pros) | |
| case error: | |
| panic(v) | |
| } | |
| } | |
| }() | |
| <-forever | |
| } | |
| func dx(priority float32, b float32) int { | |
| if b == 0 { | |
| b = 60 | |
| } | |
| return int(b - (b * priority)) | |
| } | |
| type balancerData struct { | |
| RequestID string `json:"requestId"` | |
| OnlineProKeys []string `json:"onlineProKeys"` | |
| PriorityPros []Pro `json:"prioritizedPros"` | |
| } | |
| type Pro struct { | |
| ID string `json:"id"` | |
| UserID string `json:"userId"` | |
| Priority float32 `json:"priority"` | |
| } | |
| func router(conn redis.Conn, pub redis.Conn, requestID string, prioritizedPros [][]Pro) { | |
| // ctx, _ := context.WithCancel(context.Background()) if we'll need cancel | |
| ctx := context.Background() | |
| for i, prioritizedPros := range prioritizedPros { | |
| d := dx(prioritizedPros[0].Priority, 10) | |
| var isAnyProOnline bool | |
| //var onlineProKeysChan = make(chan interface{}) | |
| var onlineProKeys []string | |
| for _, pro := range prioritizedPros { | |
| proKey := "socket:topify:pros:" + pro.UserID | |
| r, err := redis.Values(conn.Do("KEYS", proKey+":*")) | |
| if err != nil { | |
| panic(err) | |
| } | |
| if len(r) != 0 { | |
| isAnyProOnline = true | |
| } | |
| ris, _ := redis.Strings(r, nil) | |
| if len(ris) != 0 { | |
| onlineProKeys = append(onlineProKeys, ris...) | |
| } | |
| } | |
| var t time.Duration | |
| if i == 0 || !isAnyProOnline { | |
| t = 0 | |
| } else { | |
| t = time.Duration(d) | |
| } | |
| data := balancerData{ | |
| RequestID: requestID, | |
| OnlineProKeys: onlineProKeys, | |
| PriorityPros: prioritizedPros, | |
| } | |
| go scheduler( | |
| ctx, | |
| pub, | |
| data, | |
| t, | |
| ) | |
| } | |
| } | |
| func scheduler( | |
| ctx context.Context, | |
| pub redis.Conn, | |
| balancerData balancerData, | |
| t time.Duration, | |
| ) { | |
| select { | |
| case <-time.After(time.Second * t): | |
| if balancerData.OnlineProKeys == nil { | |
| fmt.Println("EMPTY") | |
| return | |
| } | |
| data, err := json.Marshal(balancerData) | |
| if err != nil { | |
| panic(err) | |
| } | |
| fmt.Println("SEND") | |
| pub.Send("PUBLISH", "newJob", data) | |
| pub.Flush() | |
| return | |
| case <-ctx.Done(): | |
| fmt.Print("CTX END") | |
| return | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment