Skip to content

Instantly share code, notes, and snippets.

@maxlapshin
Created March 17, 2019 16:38
Show Gist options
  • Save maxlapshin/3ece6fc9202f052ce1161a27e0e176c2 to your computer and use it in GitHub Desktop.
Save maxlapshin/3ece6fc9202f052ce1161a27e0e176c2 to your computer and use it in GitHub Desktop.
-module(webrtc_view). % Copy of webrtc_publish2
-behaviour(cowboy_websocket_handler).
-include_lib("erlmedia/include/video_frame.hrl").
-include_lib("erlmedia/include/media_info.hrl").
-export([live_stream/4]).
-export([init/3, terminate/3]).
-export([websocket_init/3, websocket_info/3, websocket_handle/3, websocket_terminate/3]).
live_stream(Req, Env, Name, Options) ->
case maps:get(webrtc_off, Options, false) of
true ->
{ok, {400, [], "Not available\n"}};
_ ->
{Upgrade, Req1} = cowboy_req:header(<<"upgrade">>, Req),
case to_lower(Upgrade) of
<<"websocket">> ->
try cowboy_websocket:upgrade(Req1, Env, ?MODULE, [Name, Options])
catch
C:E:ST ->
events:info("Error in loop: ~p:~p\n~p",[C,E,ST])
end,
{done, Req1};
undefined ->
flu_www:bad_request()
end
end.
init(_, Req, _) ->
{Upgrade, Req1} = cowboy_req:header(<<"upgrade">>, Req),
case to_lower(Upgrade) of
<<"websocket">> -> {upgrade, protocol, cowboy_websocket};
undefined ->
{ok,Req2} = cowboy_req:reply(400, [], "Bad request\n", Req1),
{shutdown, Req2, none}
end.
to_lower(undefined) -> undefined;
to_lower(Bin) -> cowboy_bstr:to_lower(Bin).
-record(publish, {
timer,
player,
socket,
ice_pwd_local, % STUN
ice_pwd_remote, % STUN
dtls,
srtp_params,
stream_name,
media_info,
our_ip,
our_port,
session_id,
peer_addr,
peer_port
}).
-define(WS_TIMEOUT, 5000).
websocket_init(Proto, Req, []) ->
{StreamName, Req1} = cowboy_req:qs_val(<<"name">>, Req),
websocket_init(Proto, Req1, [StreamName, []]);
websocket_init(_, Req, [StreamName, _Opts]) ->
[Socket, Transport] = cowboy_req:get([socket, transport], Req),
Transport:setopts(Socket, [{send_timeout, 4000}]),
self() ! init,
{ok, {LocalAddress, _}} = Transport:sockname(Socket),
{ok, {RemoteAddress, _}} = Transport:peername(Socket),
PeerIP = inet_ntoa(RemoteAddress),
events:md([{media,StreamName},{ip,PeerIP}]),
{ok,{stream,_}} = flu_media:start_and_subscribe(StreamName, [{proto,m4s}]),
OurIP = inet_ntoa(LocalAddress),
{SessionId, _} = cowboy_req:meta(session_id, Req),
State0 = #publish{
session_id = SessionId,
stream_name = StreamName,
our_ip = OurIP
},
{ok, Req, State0, 2*?WS_TIMEOUT}.
inet_ntoa(Addr) when size(Addr) == 4 ->
iolist_to_binary(inet:ntoa(Addr));
inet_ntoa(Addr) when size(Addr) == 8 ->
iolist_to_binary("["++inet:ntoa(Addr)++"]").
initialize_with_mi(#media_info{options = Options} = MediaInfo, Req, #publish{our_ip = OurIP} = State) ->
Ref = erlang:send_after(?WS_TIMEOUT, self(), ping),
{ok, Sock} = gen_udp:open(0, [binary,{active,once}]),
{ok, OurPort} = inet:port(Sock),
{ok, Player} = webrtc_player:init(MediaInfo#media_info{options = [{our_ip,OurIP},{our_port,OurPort}|Options]}),
Offer = jsx:encode(webrtc_player:offer(Player)),
{reply, {text,Offer}, Req, State#publish{timer = Ref, player = Player, socket = Sock}}.
websocket_info(init, Req, #publish{stream_name = StreamName} = State) ->
case live_stream:media_info(StreamName) of
undefined ->
timer:send_after(2000, self(), init),
{ok, Req, State};
#media_info{} = MediaInfo ->
initialize_with_mi(MediaInfo, Req, State)
end;
websocket_info(ping, Req, #publish{timer = Old, player = Player, socket = Sock, peer_addr = Host, peer_port = Port} = State) ->
Old =/= undefined andalso erlang:cancel_timer(Old),
{ok, Responses, Player1} = webrtc_player:send_srs(Player),
[gen_udp:send(Sock, Host, Port, Reply) || Reply <- Responses],
{reply, {ping, <<>>}, Req, State#publish{timer = undefined, player = Player1}};
websocket_info({udp, Sock, Host, Port, Packet}, Req, #publish{player = Player} = State) ->
{ok, Responses, Player1} = webrtc_player:handle_udp(Host, Port, Packet, Player),
inet:setopts(Sock, [{active,once}]),
[gen_udp:send(Sock, Host, Port, Reply) || Reply <- Responses],
{ok, Req, State#publish{player = Player1, peer_addr = Host, peer_port = Port}};
websocket_info({dtls, flight, Data}, Req, #publish{socket = Sock, peer_addr = SrcHost, peer_port = SrcPort} = State) ->
ok = gen_udp:send(Sock, SrcHost, SrcPort, Data),
{ok, Req, State};
websocket_info({dtls, key_material, SrtpParams}, Req, #publish{player = Player} = State) ->
{ok, Player1} = webrtc_player:handle_key_material(SrtpParams, Player),
{ok, Req, State#publish{player = Player1}};
% websocket_info({udp, Sock, Host, Port, <<?RTP_VERSION:2, _:14, _:48, SSRC:32, _/binary>> = SRTP}, Req,
% #publish{srtp_params = #srtp_params{server_write_SRTP_master_key = SKey, server_write_SRTP_master_salt = SSalt}, srtp_dec = undefined} = State) ->
% Dec = srtp:new_ctx(SSRC, srtpEncryptionAESCM, srtpAuthenticationSha1Hmac, SKey, SSalt)
websocket_info(#media_info{} = MediaInfo, Req, #publish{player = undefined} = State) ->
initialize_with_mi(MediaInfo, Req, State);
websocket_info(#media_info{} = MI, Req, #publish{player = Player} = State) ->
OldOffer = webrtc_player:offer(Player),
{ok, Player1} = webrtc_player:handle_media_info(MI, Player),
NewOffer = webrtc_player:offer(Player1),
if OldOffer == NewOffer ->
{ok, Req, State#publish{player = Player1}};
true ->
{reply, {text, jsx:encode(NewOffer)}, Req, State#publish{player = Player1}}
end;
websocket_info(#video_frame{} = Frame, Req, #publish{player = Player, socket = Socket,
peer_addr = Addr, peer_port = Port} = State) ->
case webrtc_player:handle_frame(Frame, Player) of
{ok, Player1} ->
{ok, Req, State#publish{player = Player1}};
{ok, Packets, Player1} ->
[gen_udp:send(Socket, Addr, Port, Packet) || Packet <- Packets],
flu_session:add_bytes(State#publish.session_id, iolist_size(Packets)),
{ok, Req, State#publish{player = Player1}}
end;
websocket_info({timeout, _, {send_frame, _DTS}}, Req, #publish{player = Player, socket = Socket,
peer_addr = Addr, peer_port = Port} = State) ->
{ok, Packets, Player1} = webrtc_player:send_next_frame(Player),
[gen_udp:send(Socket, Addr, Port, Packet) || Packet <- Packets],
flu_session:add_bytes(State#publish.session_id, iolist_size(Packets)),
{ok, Req, State#publish{player = Player1}};
websocket_info(refresh_auth, Req, State) ->
flu_session:inprocess_refresh(),
{ok, Req, State};
websocket_info(Bin, Req, State) when is_binary(Bin) ->
{reply, {text, Bin}, Req, State};
websocket_info(Message, Req, State) ->
events:warning("Unexpected message: ~P", [Message, 30]),
{ok, Req, State}.
websocket_handle({text, JSON}, Req, #publish{player = Player} = State) ->
Message = jsx:decode(JSON, [return_maps]),
case webrtc_player:handle_json(Message, Player) of
{ok, R, Player1} ->
{reply, {text, jsx:encode(R)}, Req, State#publish{player = Player1}};
{ok, Player1} ->
{ok, Req, State#publish{player = Player1}}
end;
websocket_handle({pong,_}, Req, #publish{} = State) ->
Ref = erlang:send_after(?WS_TIMEOUT, self(), ping),
{ok, Req, State#publish{timer = Ref}};
websocket_handle(_Data, Req, #publish{} = State) ->
events:info("Unknown ws message: ~p", [_Data]),
{ok, Req, State}.
websocket_terminate(_Reason, _Req, _State) ->
ok.
terminate(_,_,_) ->
ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment