Created
December 25, 2024 03:47
-
-
Save monotykamary/cc046e8e5ca398b29c22c11c2611f5f4 to your computer and use it in GitHub Desktop.
Coordinated Binance Collection with IP Rotation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Nghenhan.BinanceCollector.Supervisor do | |
use Supervisor | |
def start_link(opts) do | |
Supervisor.start_link(__MODULE__, opts, name: __MODULE__) | |
end | |
@impl true | |
def init(_opts) do | |
children = [ | |
{Registry, keys: :unique, name: Nghenhan.BinanceCollector.Registry}, | |
{Nghenhan.BinanceCollector.Coordinator, []}, | |
{FLAME.Pool, | |
name: Nghenhan.BinanceCollector.RestPool, | |
min: 1, | |
max: 10, | |
max_concurrency: 5, | |
idle_shutdown_after: 30_000 | |
}, | |
{FLAME.Pool, | |
name: Nghenhan.BinanceCollector.WebSocketPool, | |
min: 2, | |
max: 5, | |
max_concurrency: 2, | |
idle_shutdown_after: 60_000 | |
} | |
] | |
Supervisor.init(children, strategy: :one_for_one) | |
end | |
end | |
defmodule Nghenhan.BinanceCollector.Coordinator do | |
use GenServer | |
require Logger | |
@rate_limit_window 60_000 # 1 minute | |
@max_requests_per_ip 1200 # Binance limit | |
@ip_rotation_interval 300_000 # 5 minutes | |
defmodule State do | |
defstruct [:ips_table, | |
:collectors_table, | |
:request_counter_table] | |
end | |
# Public API | |
def start_link(opts \\ []) do | |
GenServer.start_link(__MODULE__, opts, name: __MODULE__) | |
end | |
def register_ip(ip) do | |
GenServer.call(__MODULE__, {:register_ip, ip}) | |
end | |
def get_available_ip do | |
GenServer.call(__MODULE__, :get_available_ip) | |
end | |
def increment_request_count(ip) do | |
:ets.update_counter(:binance_request_counts, ip, {2, 1}, {ip, 0}) | |
end | |
# Server Implementation | |
@impl true | |
def init(_opts) do | |
# Table for IP tracking | |
ips_table = :ets.new(:binance_ips, [:set, :protected, :named_table]) | |
# Table for collector tracking | |
collectors_table = :ets.new(:binance_collectors, [:set, :protected, :named_table]) | |
# Table for request counting | |
counter_table = :ets.new(:binance_request_counts, [:set, :public, :named_table]) | |
schedule_ip_rotation() | |
{:ok, %State{ | |
ips_table: ips_table, | |
collectors_table: collectors_table, | |
request_counter_table: counter_table | |
}} | |
end | |
@impl true | |
def handle_call({:register_ip, ip}, _from, state) do | |
:ets.insert(state.ips_table, {ip, System.system_time(:millisecond)}) | |
{:reply, :ok, state} | |
end | |
@impl true | |
def handle_call(:get_available_ip, _from, state) do | |
ip = find_available_ip(state) | |
{:reply, ip, state} | |
end | |
@impl true | |
def handle_info(:rotate_ips, state) do | |
rotate_ips(state) | |
schedule_ip_rotation() | |
{:noreply, state} | |
end | |
# Private Functions | |
defp find_available_ip(state) do | |
now = System.system_time(:millisecond) | |
window_start = now - @rate_limit_window | |
# Find IP with lowest request count in the window | |
state.ips_table | |
|> :ets.tab2list() | |
|> Enum.map(fn {ip, _registered_at} -> | |
count = :ets.lookup_element(state.request_counter_table, ip, 2, 0) | |
{ip, count} | |
end) | |
|> Enum.filter(fn {_ip, count} -> count < @max_requests_per_ip end) | |
|> case do | |
[] -> {:error, :rate_limited} | |
ips -> {:ok, ips |> Enum.min_by(&elem(&1, 1)) |> elem(0)} | |
end | |
end | |
defp rotate_ips(state) do | |
# Implement IP rotation strategy | |
# Could involve provisioning new IPs or cycling through a pool | |
Logger.info("Rotating IPs") | |
end | |
defp schedule_ip_rotation do | |
Process.send_after(self(), :rotate_ips, @ip_rotation_interval) | |
end | |
end | |
defmodule Nghenhan.BinanceCollector.REST do | |
@moduledoc """ | |
Collects data from Binance REST APIs with automatic retries and rate limiting. | |
""" | |
alias Nghenhan.BinanceCollector.Coordinator | |
def collect_trades(symbol, start_time, end_time) do | |
FLAME.call(Nghenhan.BinanceCollector.RestPool, fn -> | |
with {:ok, ip} <- Coordinator.get_available_ip() do | |
do_collect_trades(ip, symbol, start_time, end_time) | |
else | |
{:error, :rate_limited} -> | |
Process.sleep(1000) | |
collect_trades(symbol, start_time, end_time) | |
end | |
end) | |
end | |
defp do_collect_trades(ip, symbol, start_time, end_time) do | |
# Configure HTTP client with IP | |
headers = [ | |
{"X-MBX-APIKEY", Application.get_env(:nghenhan, :binance_api_key)} | |
] | |
url = build_trades_url(symbol, start_time, end_time) | |
case make_request(ip, url, headers) do | |
{:ok, trades} -> | |
# Write directly to landing zone | |
Nghenhan.LandingZone.write_event("trades", %{ | |
source: "binance_rest", | |
symbol: symbol, | |
timestamp: System.system_time(:millisecond), | |
data: trades | |
}) | |
{:error, reason} -> | |
Logger.error("Failed to collect trades: #{inspect(reason)}") | |
{:error, reason} | |
end | |
end | |
defp make_request(ip, url, headers) do | |
Coordinator.increment_request_count(ip) | |
# Make HTTP request with configured IP | |
# Implementation depends on HTTP client choice | |
end | |
end | |
defmodule Nghenhan.BinanceCollector.WebSocket do | |
@moduledoc """ | |
Maintains WebSocket connections to Binance with automatic reconnection. | |
Uses FLAME process placement for distribution across IPs. | |
""" | |
use GenServer | |
require Logger | |
def start_collector(symbol) do | |
spec = %{ | |
id: {__MODULE__, symbol}, | |
start: {__MODULE__, :start_link, [symbol]}, | |
restart: :transient | |
} | |
FLAME.place_child(Nghenhan.BinanceCollector.WebSocketPool, spec) | |
end | |
def start_link(symbol) do | |
GenServer.start_link(__MODULE__, symbol, name: via_tuple(symbol)) | |
end | |
@impl true | |
def init(symbol) do | |
{:ok, ip} = Nghenhan.BinanceCollector.Coordinator.get_available_ip() | |
state = %{ | |
symbol: symbol, | |
ip: ip, | |
connection: nil | |
} | |
{:ok, state, {:continue, :connect}} | |
end | |
@impl true | |
def handle_continue(:connect, state) do | |
case connect(state) do | |
{:ok, conn} -> | |
{:noreply, %{state | connection: conn}} | |
{:error, reason} -> | |
Logger.error("Failed to connect: #{inspect(reason)}") | |
Process.sleep(1000) | |
{:noreply, state, {:continue, :connect}} | |
end | |
end | |
@impl true | |
def handle_info({:gun_ws, _conn, _stream, {:text, data}}, state) do | |
parsed = Jason.decode!(data) | |
# Write directly to landing zone | |
Nghenhan.LandingZone.write_event("trades", %{ | |
source: "binance_ws", | |
symbol: state.symbol, | |
timestamp: System.system_time(:millisecond), | |
data: parsed | |
}) | |
{:noreply, state} | |
end | |
defp connect(state) do | |
# Implement WebSocket connection with IP binding | |
# Using gun or other WebSocket client | |
end | |
defp via_tuple(symbol) do | |
{:via, Registry, {Nghenhan.BinanceCollector.Registry, {__MODULE__, symbol}}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment