Skip to content

Instantly share code, notes, and snippets.

@ddrscott
Created February 13, 2018 17:29
Show Gist options
  • Select an option

  • Save ddrscott/8a9a88e0eb523a55f93a7b19d7bd005e to your computer and use it in GitHub Desktop.

Select an option

Save ddrscott/8a9a88e0eb523a55f93a7b19d7bd005e to your computer and use it in GitHub Desktop.
GDAX Websocket listener. Publishes to Redis queue
#!/usr/bin/env ruby
class GdaxTicker
attr_accessor :writer, :stats
attr_reader :socket, :feed_url
DEFAULT_WS_URL = 'wss://ws-feed.gdax.com'
PUBLISH_CHANNEL = 'altalarms:datum'
def self.main
instance.run
end
def initialize(feed_url: nil)
@feed_url = feed_url || DEFAULT_WS_URL
@sequences = Hash.new{|h, k| h[k] = 0}
@stats = Hash.new{|h, k| h[k] = 0}
end
def logger
Rails.logger
end
# Available stats:
# :open_24h, :volume_24h, :low_24h, :high_24h, :volume_30d, :price, :best_bid, :best_ask, :last_size
def on_ticker(message)
return unless message[:time]
# ignore out of sequence updates
return unless message[:sequence] > @sequences[message[:product_id]]
@sequences[message[:product_id]] = message[:sequence]
datum = {
time: Time.parse(message[:time]).to_i,
sequence: message[:sequence],
source: 'gdax',
product: message[:product_id],
side: message[:side],
price: message[:price].to_f,
qty: message[:last_size].to_f
}
AlarmRunner.redis.publish(PUBLISH_CHANNEL, datum.to_json)
stats[:tickers] += 1
stats["#{message[:product_id]}"] = datum[:price].round(2)
print_stats(message[:time])
end
def on_subscriptions(msg)
# ignore
end
def print_stats(time)
FeedUI.status('GDAX', stats.map{|k,v| "#{k}: #{v}"}.join(', '))
end
def on_data(data)
message = JSON.parse(data, symbolize_names: true)
method = "on_#{message[:type]}"
if self.respond_to?(method, true)
self.__send__(method, message)
else
logger.warn("Unsupported message type: #{message[:type]}. Try creating an `on_#{message[:type]}` method or remove the type from the channel.")
end
end
def run
ws = WebSocket::EventMachine::Client.connect(uri: feed_url)
ws.onopen do
ws.send(connect_payload.to_json)
end
ws.onmessage do |msg, type|
on_data(msg)
end
ws.onclose do |code, reason|
puts "Disconnected with status code: #{code}"
end
end
def product_ids
digital = %w(BCH LTC ETH BTC)
physical = %w(USD)
digital.product(physical).map { |m| m.join('-') }
end
def connect_payload
{
type: 'subscribe',
product_ids: product_ids,
channels: ['ticker']
}
end
def logger
SemanticLogger[self.class.to_s]
end
end
if __FILE__ == $0
require 'dotenv/load'
require 'active_support/all'
require 'em-hiredis'
require 'eventmachine'
require 'faye/websocket'
require 'influxdb'
require 'json'
require 'semantic_logger'
require 'time'
SemanticLogger.default_level = :debug
SemanticLogger.add_appender(file_name: 'log/gdax_feed.log', formatter: :color)
require_relative './influx_writer.rb'
require_relative '../alarms/alarm_runner.rb'
GdaxFeed.main
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment