Skip to content

Instantly share code, notes, and snippets.

@fanda
Last active March 16, 2016 08:06
Show Gist options
  • Save fanda/babbe6bf1f9703dcfdc1 to your computer and use it in GitHub Desktop.
Save fanda/babbe6bf1f9703dcfdc1 to your computer and use it in GitHub Desktop.
Harvester
import std.stdio; // : writeln, writefln, File;
import std.functional;
import std.string;
import std.array : split;
import vibe.d;
import vibe.db.redis.redis;
import std.process;
import yaml;
shared static this()
{
auto harvester = new Harvester;
auto controller = new HarvesterController(harvester);
auto settings = new HTTPServerSettings;
settings.bindAddresses = ["::1", "127.0.0.1"];
settings.port = 17700;
listenHTTP(settings, controller.getRouter());
}
class Harvester
{
RedisClient redis;
RedisSubscriber pricesSubscriber, marketsSubscriber;
Task[string] market_tasks, price_tasks;// threads
File[string] prices_files;// file storage
static long counter = 1;
struct PriceState {
long v; // version is keyword
double totalAvailable;
double totalMatched;
}
PriceState[string] last_prices;
public:
this() // constructor
{
redis = new RedisClient();
marketsSubscriber = redis.createSubscriber();
marketsSubscriber.listen(&this.harvestMarkets);
pricesSubscriber = redis.createSubscriber();
pricesSubscriber.listen(&this.harvestPrices);
}
void subscribe(string channel)
{
writeln("S channel ", channel);
market_tasks[channel] = runTask({
marketsSubscriber.subscribe("markets:"~channel);
});
}
void unsubscribe(string channel) {
marketsSubscriber.unsubscribe("markets:"~channel);
pricesSubscriber.unsubscribe("prices:"~channel);
market_tasks[channel].terminate();
price_tasks[channel].terminate();
market_tasks.remove(channel);
price_tasks.remove(channel);
}
private:
string decodeBase64(string data)
{
string tmpfile = "tmp/"~to!string(thisProcessID)~"_"~to!string(counter);
File f = File(tmpfile, "w");
f.write(data);
f.close();
scope(exit) std.file.remove(tmpfile);
string decoded_data = executeShell("base64 -d "~tmpfile)[1];
return decoded_data;
}
void harvestMarkets(string channel, string message)
{
version(ConsolePrints)
writeln("H markets on channel ", channel);
if (channel[0..3] == "Err")
return;
string current_time = to!Date(Clock.currTime()).toString();
string path = "data/"~channel.replace("markets:", ""),
filename = path~"/meta_"~current_time~".yaml";
std.file.mkdirRecurse(path);
File f = File(filename, "w");
f.write( decodeBase64(message) );
f.close();
string channel_name = channel.replace("markets:", "");
price_tasks[channel_name] = runTask({
pricesSubscriber.subscribe("prices:" ~ channel_name);
});
}
/*
Analyze every incoming price data.
Get prices only for subscribed markets.
Save prices into buffer, write once in second
*/
void harvestPrices(string channel, string message)
{
version(ConsolePrints)
writeln("H prices on channel ", channel);
if (channel[0..3] == "Err")
return;
// Save for Yaml loader
string tmpfile = "tmp/"~to!string(thisProcessID)~"_"~to!string(counter)~".yaml";
File f = File(tmpfile, "w");
f.write( decodeBase64(message) );
f.close();
scope(exit) std.file.remove(tmpfile);
// Parse Yaml
Node data = Loader(tmpfile).load();
string current_time = Clock.currTime().toISOExtString();
// extract prices
foreach(string market_key, Node market; data["markets"]) {
if ((market_key in prices_files) == null) {
prices_files[market_key] = File("data/"~channel.replace("prices:", "")~"/"~market_key~".csv", "w");
prices_files[market_key].writeln(
"time,status,complete,inplay,total_available,selection_id{total;back=price:size|...;lay=price:size|...} ..."
);
}
if ("CLOSED" == market["status"].as!string) {
prices_files[market_key].close();
prices_files.remove(market_key);
last_prices.remove(market_key);
return;
}
if (
(market_key in last_prices) == null
||
last_prices[market_key].v != market["version"].as!long
||
last_prices[market_key].totalAvailable != market["total_available"].as!double
||
last_prices[market_key].totalMatched != market["total_matched"].as!double
) {
writeln(current_time~" "~market_key);
string line = current_time ~ ",";
line ~= market["status"].as!string ~ ",";
line ~= market["complete"].as!string ~ ",";
line ~= market["inplay"].as!string ~ ",";
line ~= market["total_available"].as!string ~ ",";
foreach(Node selection; market["selections"]) {
line ~= selection["exchange_selection_id"].as!string ~ "{";
line ~= selection["total_matched"].as!string ~ ";";
foreach(Node price; selection["to_back"])
line ~= price["price"].as!string ~ ":" ~ price["size"].as!string ~ "|";
line ~= ";";
foreach(Node price; selection["to_lay"])
line ~= price["price"].as!string ~ ":" ~ price["size"].as!string ~ "|";
line ~= "}/";
}
prices_files[market_key].writeln(line);
last_prices[market_key] = PriceState(
market["version"].as!long,
market["total_available"].as!double,
market["total_matched"].as!double
);
}
else
prices_files[market_key].writeln(current_time~",,,,,");
}
}
}
class HarvesterController
{
URLRouter router;
Harvester harvester;
public:
this(Harvester h) // constructor
{
harvester = h;
router = new URLRouter;
router.any("*", &addCORSHeader);
router.match(HTTPMethod.OPTIONS, "*", &sendOptions);
router.match("/harvest/:exchange/:channel", &harvest);
}
URLRouter getRouter() {
return router;
}
private:
void harvest(HTTPServerRequest request, HTTPServerResponse response)
{
version(ConsolePrints)
writeln("A harvest ", request.params["channel"]);
scope(success)
response.writeBody("OK", "text/plain");
scope(failure) {
writeln("E harvest ", request.params["data_type"], request.params["channel"]);
response.statusCode = 500;
response.writeBody("ERROR", "text/plain");
}
harvester.subscribe(request.params["exchange"]~":"~request.params["channel"]);
/*else {
writeln("E Unknown data_type");
throw new Exception("Uknown data type");
}*/
}
// CORS
void addCORSHeader(HTTPServerRequest req, HTTPServerResponse res)
{
res.headers["Access-Control-Allow-Origin"] = "*";
}
void sendOptions(HTTPServerRequest req, HTTPServerResponse res)
{
addCORSHeader(req, res);
res.writeBody("");
}
}
int main()
{
lowerPrivileges();
return runEventLoop();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment