Last active
April 9, 2021 19:10
-
-
Save fionera/fdc469b4ed608502227d252c6c03dbc5 to your computer and use it in GitHub Desktop.
Pr0gramm Stocks Crawler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"time" | |
"github.com/dgraph-io/badger/v3" | |
"github.com/go-redis/redis/v8" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
) | |
var addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.") | |
const ( | |
stockPriceCurrentName = "stock_price_current" | |
stockPriceLastName = "stock_price_last" | |
stockCirculatingName = "stock_circulating" | |
) | |
var ( | |
stockPriceCurrent = prometheus.NewDesc(stockPriceCurrentName, "The current price of a Stock", []string{"stock"}, nil) | |
stockPriceLast = prometheus.NewDesc(stockPriceLastName, "The last price of a Stock", []string{"stock"}, nil) | |
stockCirculating = prometheus.NewDesc(stockCirculatingName, "The amount of circulating stocks", []string{"stock"}, nil) | |
) | |
type market struct { | |
rdb *redis.Client | |
db *badger.DB | |
} | |
func (m *market) Describe(descs chan<- *prometheus.Desc) { | |
descs <- stockPriceCurrent | |
descs <- stockPriceLast | |
} | |
func (m *market) Collect(metrics chan<- prometheus.Metric) { | |
prices, err := m.requestPrices() | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
for name, stock := range prices.Prices { | |
m.rdb.HSet(context.Background(), stockPriceCurrentName, name, stock.Cur) | |
m.rdb.HSet(context.Background(), stockPriceLastName, name, stock.Last) | |
m.rdb.HSet(context.Background(), stockCirculatingName, name, stock.Circulating) | |
metrics <- prometheus.MustNewConstMetric(stockPriceCurrent, prometheus.GaugeValue, stock.Cur, name) | |
metrics <- prometheus.MustNewConstMetric(stockPriceLast, prometheus.GaugeValue, stock.Last, name) | |
metrics <- prometheus.MustNewConstMetric(stockCirculating, prometheus.GaugeValue, stock.Circulating, name) | |
if _, err := m.requestOrderBook(name); err != nil { | |
log.Println(err) | |
} | |
if _, err := m.requestTrades(name); err != nil { | |
log.Println(err) | |
} | |
} | |
} | |
type Stock struct { | |
Cur float64 `json:"cur"` | |
Last float64 `json:"last"` | |
Circulating float64 `json:"circulating"` | |
} | |
type Prices struct { | |
Prices map[string]Stock `json:"prices"` | |
Ts int `json:"ts"` | |
} | |
type Order struct { | |
Created float64 `json:"created"` | |
Outstanding float64 `json:"outstanding"` | |
Price float64 `json:"price"` | |
} | |
type Spread struct { | |
Bid float64 `json:"bid"` | |
Ask float64 `json:"ask"` | |
Spread float64 `json:"spread"` | |
} | |
type Trades struct { | |
Spread Spread `json:"spread"` | |
Trades []Order `json:"trades"` | |
Ts int `json:"ts"` | |
} | |
type OrderBook struct { | |
Spread Spread `json:"spread"` | |
Orders struct { | |
Buy []Order `json:"buy"` | |
Sell []Order `json:"sell"` | |
} `json:"orders"` | |
Ts int `json:"ts"` | |
} | |
func (m *market) requestPrices() (*Prices, error) { | |
resp, err := http.Get("https://pr0gramm.com/api/stocks/prices") | |
if err != nil { | |
return nil, err | |
} | |
defer resp.Body.Close() | |
data, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, err | |
} | |
var p Prices | |
if err := json.NewDecoder(bytes.NewReader(data)).Decode(&p); err != nil { | |
return nil, err | |
} | |
err = m.db.Update(func(txn *badger.Txn) error { | |
key := fmt.Sprintf("prices:%d", p.Ts) | |
if err := txn.Set([]byte(key), data); err != nil { | |
return err | |
} | |
return nil | |
}) | |
if err != nil { | |
log.Println(err) | |
} | |
return &p, nil | |
} | |
func (m *market) requestOrderBook(s string) (*OrderBook, error) { | |
resp, err := http.Get("https://pr0gramm.com/api/stocks/orderbook?symbol=" + s) | |
if err != nil { | |
return nil, err | |
} | |
defer resp.Body.Close() | |
data, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, err | |
} | |
var o OrderBook | |
if err := json.NewDecoder(bytes.NewReader(data)).Decode(&o); err != nil { | |
return nil, err | |
} | |
err = m.db.Update(func(txn *badger.Txn) error { | |
key := fmt.Sprintf("order_book:%s:%d", s, o.Ts) | |
if err := txn.Set([]byte(key), data); err != nil { | |
return err | |
} | |
return nil | |
}) | |
if err != nil { | |
log.Println(err) | |
} | |
return &o, nil | |
} | |
func (m *market) requestTrades(s string) (*Trades, error) { | |
resp, err := http.Get("https://pr0gramm.com/api/stocks/trades?symbol=" + s) | |
if err != nil { | |
return nil, err | |
} | |
defer resp.Body.Close() | |
data, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, err | |
} | |
var t Trades | |
if err := json.NewDecoder(bytes.NewReader(data)).Decode(&t); err != nil { | |
return nil, err | |
} | |
err = m.db.Update(func(txn *badger.Txn) error { | |
key := fmt.Sprintf("trades:%s:%d", s, t.Ts) | |
if err := txn.Set([]byte(key), data); err != nil { | |
return err | |
} | |
return nil | |
}) | |
if err != nil { | |
log.Println(err) | |
} | |
return &t, nil | |
} | |
func main() { | |
flag.Parse() | |
db, err := badger.Open(badger.DefaultOptions(".")) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer db.Close() | |
go func() { | |
ticker := time.NewTicker(5 * time.Minute) | |
defer ticker.Stop() | |
for range ticker.C { | |
again: | |
err := db.RunValueLogGC(0.7) | |
if err == nil { | |
goto again | |
} | |
} | |
}() | |
rdb := redis.NewClient(&redis.Options{ | |
Addr: "172.17.0.1:6379", | |
}) | |
if err := prometheus.Register(&market{rdb, db}); err != nil { | |
log.Fatal(err) | |
} | |
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { | |
if req.URL.Path == "/" { | |
err := db.View(func(txn *badger.Txn) error { | |
opts := badger.DefaultIteratorOptions | |
opts.PrefetchSize = 10 | |
it := txn.NewIterator(opts) | |
defer it.Close() | |
for it.Rewind(); it.Valid(); it.Next() { | |
item := it.Item() | |
k := item.Key() | |
w.Write(k) | |
w.Write([]byte("\n")) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
}) | |
if err != nil { | |
log.Println(err) | |
} | |
return | |
} | |
err := db.View(func(txn *badger.Txn) error { | |
it, err := txn.Get([]byte(req.URL.Path[1:])) | |
if err != nil { | |
return err | |
} | |
v, err := it.ValueCopy(nil) | |
if err != nil { | |
return err | |
} | |
w.Write(v) | |
return nil | |
}) | |
if err != nil && err != badger.ErrKeyNotFound { | |
log.Println(err) | |
} | |
return | |
}) | |
http.Handle("/metrics", promhttp.Handler()) | |
log.Fatal(http.ListenAndServe(*addr, nil)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment