Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created December 25, 2024 03:47
Show Gist options
  • Save monotykamary/cc046e8e5ca398b29c22c11c2611f5f4 to your computer and use it in GitHub Desktop.
Save monotykamary/cc046e8e5ca398b29c22c11c2611f5f4 to your computer and use it in GitHub Desktop.
Coordinated Binance Collection with IP Rotation
defmodule Nghenhan.BinanceCollector.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
children = [
{Registry, keys: :unique, name: Nghenhan.BinanceCollector.Registry},
{Nghenhan.BinanceCollector.Coordinator, []},
{FLAME.Pool,
name: Nghenhan.BinanceCollector.RestPool,
min: 1,
max: 10,
max_concurrency: 5,
idle_shutdown_after: 30_000
},
{FLAME.Pool,
name: Nghenhan.BinanceCollector.WebSocketPool,
min: 2,
max: 5,
max_concurrency: 2,
idle_shutdown_after: 60_000
}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
defmodule Nghenhan.BinanceCollector.Coordinator do
use GenServer
require Logger
@rate_limit_window 60_000 # 1 minute
@max_requests_per_ip 1200 # Binance limit
@ip_rotation_interval 300_000 # 5 minutes
defmodule State do
defstruct [:ips_table,
:collectors_table,
:request_counter_table]
end
# Public API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def register_ip(ip) do
GenServer.call(__MODULE__, {:register_ip, ip})
end
def get_available_ip do
GenServer.call(__MODULE__, :get_available_ip)
end
def increment_request_count(ip) do
:ets.update_counter(:binance_request_counts, ip, {2, 1}, {ip, 0})
end
# Server Implementation
@impl true
def init(_opts) do
# Table for IP tracking
ips_table = :ets.new(:binance_ips, [:set, :protected, :named_table])
# Table for collector tracking
collectors_table = :ets.new(:binance_collectors, [:set, :protected, :named_table])
# Table for request counting
counter_table = :ets.new(:binance_request_counts, [:set, :public, :named_table])
schedule_ip_rotation()
{:ok, %State{
ips_table: ips_table,
collectors_table: collectors_table,
request_counter_table: counter_table
}}
end
@impl true
def handle_call({:register_ip, ip}, _from, state) do
:ets.insert(state.ips_table, {ip, System.system_time(:millisecond)})
{:reply, :ok, state}
end
@impl true
def handle_call(:get_available_ip, _from, state) do
ip = find_available_ip(state)
{:reply, ip, state}
end
@impl true
def handle_info(:rotate_ips, state) do
rotate_ips(state)
schedule_ip_rotation()
{:noreply, state}
end
# Private Functions
defp find_available_ip(state) do
now = System.system_time(:millisecond)
window_start = now - @rate_limit_window
# Find IP with lowest request count in the window
state.ips_table
|> :ets.tab2list()
|> Enum.map(fn {ip, _registered_at} ->
count = :ets.lookup_element(state.request_counter_table, ip, 2, 0)
{ip, count}
end)
|> Enum.filter(fn {_ip, count} -> count < @max_requests_per_ip end)
|> case do
[] -> {:error, :rate_limited}
ips -> {:ok, ips |> Enum.min_by(&elem(&1, 1)) |> elem(0)}
end
end
defp rotate_ips(state) do
# Implement IP rotation strategy
# Could involve provisioning new IPs or cycling through a pool
Logger.info("Rotating IPs")
end
defp schedule_ip_rotation do
Process.send_after(self(), :rotate_ips, @ip_rotation_interval)
end
end
defmodule Nghenhan.BinanceCollector.REST do
@moduledoc """
Collects data from Binance REST APIs with automatic retries and rate limiting.
"""
alias Nghenhan.BinanceCollector.Coordinator
def collect_trades(symbol, start_time, end_time) do
FLAME.call(Nghenhan.BinanceCollector.RestPool, fn ->
with {:ok, ip} <- Coordinator.get_available_ip() do
do_collect_trades(ip, symbol, start_time, end_time)
else
{:error, :rate_limited} ->
Process.sleep(1000)
collect_trades(symbol, start_time, end_time)
end
end)
end
defp do_collect_trades(ip, symbol, start_time, end_time) do
# Configure HTTP client with IP
headers = [
{"X-MBX-APIKEY", Application.get_env(:nghenhan, :binance_api_key)}
]
url = build_trades_url(symbol, start_time, end_time)
case make_request(ip, url, headers) do
{:ok, trades} ->
# Write directly to landing zone
Nghenhan.LandingZone.write_event("trades", %{
source: "binance_rest",
symbol: symbol,
timestamp: System.system_time(:millisecond),
data: trades
})
{:error, reason} ->
Logger.error("Failed to collect trades: #{inspect(reason)}")
{:error, reason}
end
end
defp make_request(ip, url, headers) do
Coordinator.increment_request_count(ip)
# Make HTTP request with configured IP
# Implementation depends on HTTP client choice
end
end
defmodule Nghenhan.BinanceCollector.WebSocket do
@moduledoc """
Maintains WebSocket connections to Binance with automatic reconnection.
Uses FLAME process placement for distribution across IPs.
"""
use GenServer
require Logger
def start_collector(symbol) do
spec = %{
id: {__MODULE__, symbol},
start: {__MODULE__, :start_link, [symbol]},
restart: :transient
}
FLAME.place_child(Nghenhan.BinanceCollector.WebSocketPool, spec)
end
def start_link(symbol) do
GenServer.start_link(__MODULE__, symbol, name: via_tuple(symbol))
end
@impl true
def init(symbol) do
{:ok, ip} = Nghenhan.BinanceCollector.Coordinator.get_available_ip()
state = %{
symbol: symbol,
ip: ip,
connection: nil
}
{:ok, state, {:continue, :connect}}
end
@impl true
def handle_continue(:connect, state) do
case connect(state) do
{:ok, conn} ->
{:noreply, %{state | connection: conn}}
{:error, reason} ->
Logger.error("Failed to connect: #{inspect(reason)}")
Process.sleep(1000)
{:noreply, state, {:continue, :connect}}
end
end
@impl true
def handle_info({:gun_ws, _conn, _stream, {:text, data}}, state) do
parsed = Jason.decode!(data)
# Write directly to landing zone
Nghenhan.LandingZone.write_event("trades", %{
source: "binance_ws",
symbol: state.symbol,
timestamp: System.system_time(:millisecond),
data: parsed
})
{:noreply, state}
end
defp connect(state) do
# Implement WebSocket connection with IP binding
# Using gun or other WebSocket client
end
defp via_tuple(symbol) do
{:via, Registry, {Nghenhan.BinanceCollector.Registry, {__MODULE__, symbol}}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment