Skip to content

Instantly share code, notes, and snippets.

@radekg
Created July 18, 2012 21:49
Show Gist options
  • Save radekg/3139153 to your computer and use it in GitHub Desktop.
Save radekg/3139153 to your computer and use it in GitHub Desktop.
erl-lb
-module(lb).
-compile(export_all).
-include_lib("stdlib/include/qlc.hrl").
-include_lib("kernel/include/inet.hrl").
start_lb() ->
{ok, Redis} = eredis:start_link(),
eredis:q(Redis, ["FLUSHALL" | []]),
spawn_link(?MODULE, loop0, [1883,Redis]),
spawn_link(?MODULE, loop1, [1884,Redis]).
% -------------------------------------------------------------------------------------------------
% PROXY ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
% -------------------------------------------------------------------------------------------------
loop0(Port,Redis) ->
case gen_tcp:listen(Port, [binary, {reuseaddr, true}, {packet,0}, {active, true}]) of
{ok, LSock} ->
io:format("Listening on ~p~n", [Port]),
spawn(?MODULE, worker, [self(), LSock, Redis]),
loop(LSock, Redis);
Other ->
io:format("Can't listen to socket ~p~n", [Other])
end.
loop(S, Redis) ->
receive
next_worker ->
spawn_link(?MODULE, worker, [self(), S, Redis])
end,
loop(S, Redis).
worker(Server, LS, Redis) ->
case gen_tcp:accept(LS) of
{ok, SourceSocket} ->
inet:setopts(SourceSocket,[{exit_on_close,false}]),
Target = binary:bin_to_list( get_single_less_busy( Redis ) ),
increase( Target, Redis ),
{ok,TargetSocket} = gen_tcp:connect( Target, 1883, [binary, {packet, 0}] ),
inet:setopts(TargetSocket,[{exit_on_close,false}]),
Server ! next_worker,
proxy_data(SourceSocket,TargetSocket,Redis);
{error, Reason} ->
Server ! next_worker,
io:format("Can't accept socket ~p~n", [Reason])
end.
proxy_data(Source,Target,Redis) ->
inet:setopts(Source,[{active,once}]),
receive
{tcp,Source,Data} ->
gen_tcp:send( Target, Data ),
proxy_data(Source,Target,Redis);
{tcp,Target,Data} ->
gen_tcp:send( Source, Data ),
proxy_data(Source,Target,Redis);
{tcp_closed,Source} ->
{ok,{Address,_Port}} = inet:peername(Target),
decrease( inet_parse:ntoa( Address ), Redis ),
gen_tcp:close(Target),
gen_tcp:close(Source);
{tcp_closed,Target} ->
{ok,{Address,_Port}} = inet:peername(Target),
decrease( inet_parse:ntoa( Address ), Redis ),
gen_tcp:close(Target),
gen_tcp:close(Source),
ok
end.
% -------------------------------------------------------------------------------------------------
% CONTROL CHANNEL ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
% -------------------------------------------------------------------------------------------------
loop1(Port,Redis) ->
case gen_tcp:listen(Port, [{reuseaddr, true}, {packet,0}, {active, true}]) of
{ok, LSock} ->
io:format("Control channel listening on ~p~n", [Port]),
spawn(?MODULE, worker_control, [self(), LSock, Redis]),
loop_control(LSock,Redis);
Other ->
io:format("Control channel can't listen to socket ~p~n", [Other])
end.
loop_control(S,Redis) ->
receive
next_worker_control ->
spawn_link(?MODULE, worker_control, [self(), S, Redis])
end,
loop_control(S, Redis).
worker_control(Server, LS, Redis) ->
case gen_tcp:accept(LS) of
{ok, SourceSocket} ->
inet:setopts(SourceSocket,[{active,once}]),
receive
{tcp,SourceSocket,Data} ->
io:format("Adding broker: ~p.~n", ["" ++ Data]),
add_broker( "" ++ Data, Redis ),
Server ! next_worker_control,
gen_tcp:close(SourceSocket),
ok
end;
{error, Reason} ->
Server ! next_worker_control,
io:format("Can't accept socket ~p~n", [Reason])
end.
% -------------------------------------------------------------------------------------------------
% MNESIA :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
% -------------------------------------------------------------------------------------------------
add_broker(Hostname, Redis) ->
eredis:q(Redis, ["ZADD" | ["servers", 0, Hostname]]).
show_all() ->
{ok,LocalRedis} = eredis:start_link(),
{ok, Results} = eredis:q(LocalRedis, ["ZRANGEBYSCORE" | ["servers", 0, 1000000] ]),
Results.
get_single_less_busy(Redis) ->
{ok, Values} = eredis:q(Redis, ["ZRANGEBYSCORE" | ["servers", 0, 1000000, "limit", 0, 1]]),
[Head|_Tail] = Values,
Head.
increase(Host, Redis) ->
eredis:q(Redis, ["ZINCRBY" | ["servers", 1, Host]]).
decrease(Host, Redis) ->
eredis:q(Redis, ["ZINCRBY" | ["servers", -1, Host]]).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment