Last active
March 16, 2016 08:06
-
-
Save fanda/babbe6bf1f9703dcfdc1 to your computer and use it in GitHub Desktop.
Harvester
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import std.stdio; // : writeln, writefln, File; | |
import std.functional; | |
import std.string; | |
import std.array : split; | |
import vibe.d; | |
import vibe.db.redis.redis; | |
import std.process; | |
import yaml; | |
shared static this() | |
{ | |
auto harvester = new Harvester; | |
auto controller = new HarvesterController(harvester); | |
auto settings = new HTTPServerSettings; | |
settings.bindAddresses = ["::1", "127.0.0.1"]; | |
settings.port = 17700; | |
listenHTTP(settings, controller.getRouter()); | |
} | |
class Harvester | |
{ | |
RedisClient redis; | |
RedisSubscriber pricesSubscriber, marketsSubscriber; | |
Task[string] market_tasks, price_tasks;// threads | |
File[string] prices_files;// file storage | |
static long counter = 1; | |
struct PriceState { | |
long v; // version is keyword | |
double totalAvailable; | |
double totalMatched; | |
} | |
PriceState[string] last_prices; | |
public: | |
this() // constructor | |
{ | |
redis = new RedisClient(); | |
marketsSubscriber = redis.createSubscriber(); | |
marketsSubscriber.listen(&this.harvestMarkets); | |
pricesSubscriber = redis.createSubscriber(); | |
pricesSubscriber.listen(&this.harvestPrices); | |
} | |
void subscribe(string channel) | |
{ | |
writeln("S channel ", channel); | |
market_tasks[channel] = runTask({ | |
marketsSubscriber.subscribe("markets:"~channel); | |
}); | |
} | |
void unsubscribe(string channel) { | |
marketsSubscriber.unsubscribe("markets:"~channel); | |
pricesSubscriber.unsubscribe("prices:"~channel); | |
market_tasks[channel].terminate(); | |
price_tasks[channel].terminate(); | |
market_tasks.remove(channel); | |
price_tasks.remove(channel); | |
} | |
private: | |
string decodeBase64(string data) | |
{ | |
string tmpfile = "tmp/"~to!string(thisProcessID)~"_"~to!string(counter); | |
File f = File(tmpfile, "w"); | |
f.write(data); | |
f.close(); | |
scope(exit) std.file.remove(tmpfile); | |
string decoded_data = executeShell("base64 -d "~tmpfile)[1]; | |
return decoded_data; | |
} | |
void harvestMarkets(string channel, string message) | |
{ | |
version(ConsolePrints) | |
writeln("H markets on channel ", channel); | |
if (channel[0..3] == "Err") | |
return; | |
string current_time = to!Date(Clock.currTime()).toString(); | |
string path = "data/"~channel.replace("markets:", ""), | |
filename = path~"/meta_"~current_time~".yaml"; | |
std.file.mkdirRecurse(path); | |
File f = File(filename, "w"); | |
f.write( decodeBase64(message) ); | |
f.close(); | |
string channel_name = channel.replace("markets:", ""); | |
price_tasks[channel_name] = runTask({ | |
pricesSubscriber.subscribe("prices:" ~ channel_name); | |
}); | |
} | |
/* | |
Analyze every incoming price data. | |
Get prices only for subscribed markets. | |
Save prices into buffer, write once in second | |
*/ | |
void harvestPrices(string channel, string message) | |
{ | |
version(ConsolePrints) | |
writeln("H prices on channel ", channel); | |
if (channel[0..3] == "Err") | |
return; | |
// Save for Yaml loader | |
string tmpfile = "tmp/"~to!string(thisProcessID)~"_"~to!string(counter)~".yaml"; | |
File f = File(tmpfile, "w"); | |
f.write( decodeBase64(message) ); | |
f.close(); | |
scope(exit) std.file.remove(tmpfile); | |
// Parse Yaml | |
Node data = Loader(tmpfile).load(); | |
string current_time = Clock.currTime().toISOExtString(); | |
// extract prices | |
foreach(string market_key, Node market; data["markets"]) { | |
if ((market_key in prices_files) == null) { | |
prices_files[market_key] = File("data/"~channel.replace("prices:", "")~"/"~market_key~".csv", "w"); | |
prices_files[market_key].writeln( | |
"time,status,complete,inplay,total_available,selection_id{total;back=price:size|...;lay=price:size|...} ..." | |
); | |
} | |
if ("CLOSED" == market["status"].as!string) { | |
prices_files[market_key].close(); | |
prices_files.remove(market_key); | |
last_prices.remove(market_key); | |
return; | |
} | |
if ( | |
(market_key in last_prices) == null | |
|| | |
last_prices[market_key].v != market["version"].as!long | |
|| | |
last_prices[market_key].totalAvailable != market["total_available"].as!double | |
|| | |
last_prices[market_key].totalMatched != market["total_matched"].as!double | |
) { | |
writeln(current_time~" "~market_key); | |
string line = current_time ~ ","; | |
line ~= market["status"].as!string ~ ","; | |
line ~= market["complete"].as!string ~ ","; | |
line ~= market["inplay"].as!string ~ ","; | |
line ~= market["total_available"].as!string ~ ","; | |
foreach(Node selection; market["selections"]) { | |
line ~= selection["exchange_selection_id"].as!string ~ "{"; | |
line ~= selection["total_matched"].as!string ~ ";"; | |
foreach(Node price; selection["to_back"]) | |
line ~= price["price"].as!string ~ ":" ~ price["size"].as!string ~ "|"; | |
line ~= ";"; | |
foreach(Node price; selection["to_lay"]) | |
line ~= price["price"].as!string ~ ":" ~ price["size"].as!string ~ "|"; | |
line ~= "}/"; | |
} | |
prices_files[market_key].writeln(line); | |
last_prices[market_key] = PriceState( | |
market["version"].as!long, | |
market["total_available"].as!double, | |
market["total_matched"].as!double | |
); | |
} | |
else | |
prices_files[market_key].writeln(current_time~",,,,,"); | |
} | |
} | |
} | |
class HarvesterController | |
{ | |
URLRouter router; | |
Harvester harvester; | |
public: | |
this(Harvester h) // constructor | |
{ | |
harvester = h; | |
router = new URLRouter; | |
router.any("*", &addCORSHeader); | |
router.match(HTTPMethod.OPTIONS, "*", &sendOptions); | |
router.match("/harvest/:exchange/:channel", &harvest); | |
} | |
URLRouter getRouter() { | |
return router; | |
} | |
private: | |
void harvest(HTTPServerRequest request, HTTPServerResponse response) | |
{ | |
version(ConsolePrints) | |
writeln("A harvest ", request.params["channel"]); | |
scope(success) | |
response.writeBody("OK", "text/plain"); | |
scope(failure) { | |
writeln("E harvest ", request.params["data_type"], request.params["channel"]); | |
response.statusCode = 500; | |
response.writeBody("ERROR", "text/plain"); | |
} | |
harvester.subscribe(request.params["exchange"]~":"~request.params["channel"]); | |
/*else { | |
writeln("E Unknown data_type"); | |
throw new Exception("Uknown data type"); | |
}*/ | |
} | |
// CORS | |
void addCORSHeader(HTTPServerRequest req, HTTPServerResponse res) | |
{ | |
res.headers["Access-Control-Allow-Origin"] = "*"; | |
} | |
void sendOptions(HTTPServerRequest req, HTTPServerResponse res) | |
{ | |
addCORSHeader(req, res); | |
res.writeBody(""); | |
} | |
} | |
int main() | |
{ | |
lowerPrivileges(); | |
return runEventLoop(); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment