Last active
October 19, 2019 13:13
-
-
Save shirou/86c1f1db08c4050aec29371023ab4b56 to your computer and use it in GitHub Desktop.
marketstore dummy plugin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"math/rand" | |
"time" | |
) | |
// DummySubscriber is an dummy | |
type DummySubscriber struct { | |
config map[string]interface{} | |
ctx context.Context | |
writerChannel chan Ticks | |
} | |
// Run starts subscribe | |
func (sub *DummySubscriber) Run() { | |
rand.Seed(time.Now().Unix()) | |
ticker := time.NewTicker(time.Second) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ticker.C: | |
ticks := sub.getDummyTicks(1) | |
sub.writerChannel <- ticks | |
} | |
} | |
} | |
func (sub *DummySubscriber) getDummyTicks(count int) Ticks { | |
symbols := []string{"USDJPY", "EURUSD", "EURJPY"} | |
ticks := make(Ticks, count) | |
for i := 0; i < count; i++ { | |
ticks[i] = &Tick{ | |
sym: symbols[rand.Intn(len(symbols))], | |
askPrices: []float32{float32(rand.Intn(160))}, | |
time: time.Now(), | |
} | |
} | |
return ticks | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"github.com/alpacahq/marketstore/plugins/bgworker" | |
"github.com/alpacahq/marketstore/utils/log" | |
) | |
const maxWriteChannelBufferSize = 100000 | |
// NewBgWorker returns marketsoter bgworker plugin | |
func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { | |
log.Info("loaded Subscribe config...") | |
writerChannel := make(chan Ticks, maxWriteChannelBufferSize) | |
writer, err := NewWriter(writerChannel, "1Sec") | |
if err != nil { | |
log.Error("failed create writer: %v", err) | |
return nil, err | |
} | |
ctx := context.Background() | |
go writer.Run(ctx) | |
return &DummySubscriber{ | |
config: conf, | |
ctx: ctx, | |
writerChannel: writerChannel, | |
}, nil | |
} | |
func main() { | |
log.Error("kdb writer main invoked") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pandas as pd | |
import pymarketstore as pymkts | |
now = pd.Timestamp.utcnow() | |
endpoint = os.environ.get("MARKETSTORE_ENDPOINT", "http://localhost:5993/rpc") | |
mkts = pymkts.Client(endpoint) | |
now = pd.Timestamp.utcnow() | |
param = pymkts.Param( | |
'USDJPY', | |
"1Sec", | |
"TICK", | |
start=now - pd.Timedelta('1D'), | |
end=now, | |
) | |
a = mkts.query(param).first().df() | |
print(a) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"time" | |
"github.com/alpacahq/marketstore/utils/io" | |
) | |
// Tick represent a tick | |
type Tick struct { | |
time time.Time | |
sym string | |
askPrices []float32 | |
} | |
// Ticks is an array of Tick | |
type Ticks []*Tick | |
func (ticks Ticks) convertToCSM(timeFrame string) (io.ColumnSeriesMap, error) { | |
csm := io.NewColumnSeriesMap() | |
for _, tick := range ticks { | |
timeBucketKey := io.NewTimeBucketKeyFromString(tick.sym + "/" + timeFrame + "/TICK") | |
cs, ok := csm[*timeBucketKey] | |
if !ok { | |
cs = io.NewColumnSeries() | |
} | |
cs.AddColumn("Epoch", []int64{tick.time.UnixNano()}) | |
cs.AddColumn("L1-Askprice", getL1Price(tick.askPrices)) | |
csm[*timeBucketKey] = cs | |
} | |
return csm, nil | |
} | |
func getL1Price(prices []float32) []float32 { | |
if len(prices) == 0 { | |
return []float32{0} | |
} | |
return []float32{prices[0]} | |
} | |
func getL1Int(value []int) []int { | |
if len(value) == 0 { | |
return []int{0} | |
} | |
return []int{value[0]} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"github.com/alpacahq/marketstore/executor" | |
"github.com/alpacahq/marketstore/utils/io" | |
"github.com/alpacahq/marketstore/utils/log" | |
) | |
// Writer writes Ticks to marketstore | |
type Writer struct { | |
ch chan Ticks | |
timeFrame string | |
} | |
// NewWriter returns new Writer | |
func NewWriter(ch chan Ticks, timeFrame string) (*Writer, error) { | |
return &Writer{ | |
ch: ch, | |
timeFrame: timeFrame, | |
}, nil | |
} | |
// Run start go routine and wait ticks comes | |
func (w *Writer) Run(ctx context.Context) { | |
for { | |
select { | |
case ticks, ok := <-w.ch: | |
if !ok { | |
return | |
} | |
csm, err := ticks.convertToCSM(w.timeFrame) | |
if err != nil { | |
log.Error("ToCSM error %v", err) | |
continue | |
} | |
if err := w.write(csm); err != nil { | |
log.Error("write error %v", err) | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
func (w *Writer) write(csm io.ColumnSeriesMap) error { | |
log.Debug("got CSM: %v", csm) | |
if len(csm) == 0 { | |
log.Info("CSM is empty") | |
return nil | |
} | |
// Note: isVariables always true because we use Tick | |
if err := executor.WriteCSM(csm, true); err != nil { | |
return fmt.Errorf("failed to write CSM %v", err) | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment