Skip to content

Instantly share code, notes, and snippets.

@shirou
Last active October 19, 2019 13:13
Show Gist options
  • Save shirou/86c1f1db08c4050aec29371023ab4b56 to your computer and use it in GitHub Desktop.
Save shirou/86c1f1db08c4050aec29371023ab4b56 to your computer and use it in GitHub Desktop.
marketstore dummy plugin
package main
import (
"context"
"math/rand"
"time"
)
// DummySubscriber is an dummy
type DummySubscriber struct {
config map[string]interface{}
ctx context.Context
writerChannel chan Ticks
}
// Run starts subscribe
func (sub *DummySubscriber) Run() {
rand.Seed(time.Now().Unix())
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ticks := sub.getDummyTicks(1)
sub.writerChannel <- ticks
}
}
}
func (sub *DummySubscriber) getDummyTicks(count int) Ticks {
symbols := []string{"USDJPY", "EURUSD", "EURJPY"}
ticks := make(Ticks, count)
for i := 0; i < count; i++ {
ticks[i] = &Tick{
sym: symbols[rand.Intn(len(symbols))],
askPrices: []float32{float32(rand.Intn(160))},
time: time.Now(),
}
}
return ticks
}
package main
import (
"context"
"github.com/alpacahq/marketstore/plugins/bgworker"
"github.com/alpacahq/marketstore/utils/log"
)
const maxWriteChannelBufferSize = 100000
// NewBgWorker returns marketsoter bgworker plugin
func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
log.Info("loaded Subscribe config...")
writerChannel := make(chan Ticks, maxWriteChannelBufferSize)
writer, err := NewWriter(writerChannel, "1Sec")
if err != nil {
log.Error("failed create writer: %v", err)
return nil, err
}
ctx := context.Background()
go writer.Run(ctx)
return &DummySubscriber{
config: conf,
ctx: ctx,
writerChannel: writerChannel,
}, nil
}
func main() {
log.Error("kdb writer main invoked")
}
import os
import pandas as pd
import pymarketstore as pymkts
now = pd.Timestamp.utcnow()
endpoint = os.environ.get("MARKETSTORE_ENDPOINT", "http://localhost:5993/rpc")
mkts = pymkts.Client(endpoint)
now = pd.Timestamp.utcnow()
param = pymkts.Param(
'USDJPY',
"1Sec",
"TICK",
start=now - pd.Timedelta('1D'),
end=now,
)
a = mkts.query(param).first().df()
print(a)
package main
import (
"time"
"github.com/alpacahq/marketstore/utils/io"
)
// Tick represent a tick
type Tick struct {
time time.Time
sym string
askPrices []float32
}
// Ticks is an array of Tick
type Ticks []*Tick
func (ticks Ticks) convertToCSM(timeFrame string) (io.ColumnSeriesMap, error) {
csm := io.NewColumnSeriesMap()
for _, tick := range ticks {
timeBucketKey := io.NewTimeBucketKeyFromString(tick.sym + "/" + timeFrame + "/TICK")
cs, ok := csm[*timeBucketKey]
if !ok {
cs = io.NewColumnSeries()
}
cs.AddColumn("Epoch", []int64{tick.time.UnixNano()})
cs.AddColumn("L1-Askprice", getL1Price(tick.askPrices))
csm[*timeBucketKey] = cs
}
return csm, nil
}
func getL1Price(prices []float32) []float32 {
if len(prices) == 0 {
return []float32{0}
}
return []float32{prices[0]}
}
func getL1Int(value []int) []int {
if len(value) == 0 {
return []int{0}
}
return []int{value[0]}
}
package main
import (
"context"
"fmt"
"github.com/alpacahq/marketstore/executor"
"github.com/alpacahq/marketstore/utils/io"
"github.com/alpacahq/marketstore/utils/log"
)
// Writer writes Ticks to marketstore
type Writer struct {
ch chan Ticks
timeFrame string
}
// NewWriter returns new Writer
func NewWriter(ch chan Ticks, timeFrame string) (*Writer, error) {
return &Writer{
ch: ch,
timeFrame: timeFrame,
}, nil
}
// Run start go routine and wait ticks comes
func (w *Writer) Run(ctx context.Context) {
for {
select {
case ticks, ok := <-w.ch:
if !ok {
return
}
csm, err := ticks.convertToCSM(w.timeFrame)
if err != nil {
log.Error("ToCSM error %v", err)
continue
}
if err := w.write(csm); err != nil {
log.Error("write error %v", err)
}
case <-ctx.Done():
return
}
}
}
func (w *Writer) write(csm io.ColumnSeriesMap) error {
log.Debug("got CSM: %v", csm)
if len(csm) == 0 {
log.Info("CSM is empty")
return nil
}
// Note: isVariables always true because we use Tick
if err := executor.WriteCSM(csm, true); err != nil {
return fmt.Errorf("failed to write CSM %v", err)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment