Created
February 13, 2018 17:29
-
-
Save ddrscott/8a9a88e0eb523a55f93a7b19d7bd005e to your computer and use it in GitHub Desktop.
GDAX Websocket listener. Publishes to Redis queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| class GdaxTicker | |
| attr_accessor :writer, :stats | |
| attr_reader :socket, :feed_url | |
| DEFAULT_WS_URL = 'wss://ws-feed.gdax.com' | |
| PUBLISH_CHANNEL = 'altalarms:datum' | |
| def self.main | |
| instance.run | |
| end | |
| def initialize(feed_url: nil) | |
| @feed_url = feed_url || DEFAULT_WS_URL | |
| @sequences = Hash.new{|h, k| h[k] = 0} | |
| @stats = Hash.new{|h, k| h[k] = 0} | |
| end | |
| def logger | |
| Rails.logger | |
| end | |
| # Available stats: | |
| # :open_24h, :volume_24h, :low_24h, :high_24h, :volume_30d, :price, :best_bid, :best_ask, :last_size | |
| def on_ticker(message) | |
| return unless message[:time] | |
| # ignore out of sequence updates | |
| return unless message[:sequence] > @sequences[message[:product_id]] | |
| @sequences[message[:product_id]] = message[:sequence] | |
| datum = { | |
| time: Time.parse(message[:time]).to_i, | |
| sequence: message[:sequence], | |
| source: 'gdax', | |
| product: message[:product_id], | |
| side: message[:side], | |
| price: message[:price].to_f, | |
| qty: message[:last_size].to_f | |
| } | |
| AlarmRunner.redis.publish(PUBLISH_CHANNEL, datum.to_json) | |
| stats[:tickers] += 1 | |
| stats["#{message[:product_id]}"] = datum[:price].round(2) | |
| print_stats(message[:time]) | |
| end | |
| def on_subscriptions(msg) | |
| # ignore | |
| end | |
| def print_stats(time) | |
| FeedUI.status('GDAX', stats.map{|k,v| "#{k}: #{v}"}.join(', ')) | |
| end | |
| def on_data(data) | |
| message = JSON.parse(data, symbolize_names: true) | |
| method = "on_#{message[:type]}" | |
| if self.respond_to?(method, true) | |
| self.__send__(method, message) | |
| else | |
| logger.warn("Unsupported message type: #{message[:type]}. Try creating an `on_#{message[:type]}` method or remove the type from the channel.") | |
| end | |
| end | |
| def run | |
| ws = WebSocket::EventMachine::Client.connect(uri: feed_url) | |
| ws.onopen do | |
| ws.send(connect_payload.to_json) | |
| end | |
| ws.onmessage do |msg, type| | |
| on_data(msg) | |
| end | |
| ws.onclose do |code, reason| | |
| puts "Disconnected with status code: #{code}" | |
| end | |
| end | |
| def product_ids | |
| digital = %w(BCH LTC ETH BTC) | |
| physical = %w(USD) | |
| digital.product(physical).map { |m| m.join('-') } | |
| end | |
| def connect_payload | |
| { | |
| type: 'subscribe', | |
| product_ids: product_ids, | |
| channels: ['ticker'] | |
| } | |
| end | |
| def logger | |
| SemanticLogger[self.class.to_s] | |
| end | |
| end | |
| if __FILE__ == $0 | |
| require 'dotenv/load' | |
| require 'active_support/all' | |
| require 'em-hiredis' | |
| require 'eventmachine' | |
| require 'faye/websocket' | |
| require 'influxdb' | |
| require 'json' | |
| require 'semantic_logger' | |
| require 'time' | |
| SemanticLogger.default_level = :debug | |
| SemanticLogger.add_appender(file_name: 'log/gdax_feed.log', formatter: :color) | |
| require_relative './influx_writer.rb' | |
| require_relative '../alarms/alarm_runner.rb' | |
| GdaxFeed.main | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment