Created
March 17, 2019 16:38
-
-
Save maxlapshin/3ece6fc9202f052ce1161a27e0e176c2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(webrtc_view). % Copy of webrtc_publish2 | |
-behaviour(cowboy_websocket_handler). | |
-include_lib("erlmedia/include/video_frame.hrl"). | |
-include_lib("erlmedia/include/media_info.hrl"). | |
-export([live_stream/4]). | |
-export([init/3, terminate/3]). | |
-export([websocket_init/3, websocket_info/3, websocket_handle/3, websocket_terminate/3]). | |
live_stream(Req, Env, Name, Options) -> | |
case maps:get(webrtc_off, Options, false) of | |
true -> | |
{ok, {400, [], "Not available\n"}}; | |
_ -> | |
{Upgrade, Req1} = cowboy_req:header(<<"upgrade">>, Req), | |
case to_lower(Upgrade) of | |
<<"websocket">> -> | |
try cowboy_websocket:upgrade(Req1, Env, ?MODULE, [Name, Options]) | |
catch | |
C:E:ST -> | |
events:info("Error in loop: ~p:~p\n~p",[C,E,ST]) | |
end, | |
{done, Req1}; | |
undefined -> | |
flu_www:bad_request() | |
end | |
end. | |
init(_, Req, _) -> | |
{Upgrade, Req1} = cowboy_req:header(<<"upgrade">>, Req), | |
case to_lower(Upgrade) of | |
<<"websocket">> -> {upgrade, protocol, cowboy_websocket}; | |
undefined -> | |
{ok,Req2} = cowboy_req:reply(400, [], "Bad request\n", Req1), | |
{shutdown, Req2, none} | |
end. | |
to_lower(undefined) -> undefined; | |
to_lower(Bin) -> cowboy_bstr:to_lower(Bin). | |
-record(publish, { | |
timer, | |
player, | |
socket, | |
ice_pwd_local, % STUN | |
ice_pwd_remote, % STUN | |
dtls, | |
srtp_params, | |
stream_name, | |
media_info, | |
our_ip, | |
our_port, | |
session_id, | |
peer_addr, | |
peer_port | |
}). | |
-define(WS_TIMEOUT, 5000). | |
websocket_init(Proto, Req, []) -> | |
{StreamName, Req1} = cowboy_req:qs_val(<<"name">>, Req), | |
websocket_init(Proto, Req1, [StreamName, []]); | |
websocket_init(_, Req, [StreamName, _Opts]) -> | |
[Socket, Transport] = cowboy_req:get([socket, transport], Req), | |
Transport:setopts(Socket, [{send_timeout, 4000}]), | |
self() ! init, | |
{ok, {LocalAddress, _}} = Transport:sockname(Socket), | |
{ok, {RemoteAddress, _}} = Transport:peername(Socket), | |
PeerIP = inet_ntoa(RemoteAddress), | |
events:md([{media,StreamName},{ip,PeerIP}]), | |
{ok,{stream,_}} = flu_media:start_and_subscribe(StreamName, [{proto,m4s}]), | |
OurIP = inet_ntoa(LocalAddress), | |
{SessionId, _} = cowboy_req:meta(session_id, Req), | |
State0 = #publish{ | |
session_id = SessionId, | |
stream_name = StreamName, | |
our_ip = OurIP | |
}, | |
{ok, Req, State0, 2*?WS_TIMEOUT}. | |
inet_ntoa(Addr) when size(Addr) == 4 -> | |
iolist_to_binary(inet:ntoa(Addr)); | |
inet_ntoa(Addr) when size(Addr) == 8 -> | |
iolist_to_binary("["++inet:ntoa(Addr)++"]"). | |
initialize_with_mi(#media_info{options = Options} = MediaInfo, Req, #publish{our_ip = OurIP} = State) -> | |
Ref = erlang:send_after(?WS_TIMEOUT, self(), ping), | |
{ok, Sock} = gen_udp:open(0, [binary,{active,once}]), | |
{ok, OurPort} = inet:port(Sock), | |
{ok, Player} = webrtc_player:init(MediaInfo#media_info{options = [{our_ip,OurIP},{our_port,OurPort}|Options]}), | |
Offer = jsx:encode(webrtc_player:offer(Player)), | |
{reply, {text,Offer}, Req, State#publish{timer = Ref, player = Player, socket = Sock}}. | |
websocket_info(init, Req, #publish{stream_name = StreamName} = State) -> | |
case live_stream:media_info(StreamName) of | |
undefined -> | |
timer:send_after(2000, self(), init), | |
{ok, Req, State}; | |
#media_info{} = MediaInfo -> | |
initialize_with_mi(MediaInfo, Req, State) | |
end; | |
websocket_info(ping, Req, #publish{timer = Old, player = Player, socket = Sock, peer_addr = Host, peer_port = Port} = State) -> | |
Old =/= undefined andalso erlang:cancel_timer(Old), | |
{ok, Responses, Player1} = webrtc_player:send_srs(Player), | |
[gen_udp:send(Sock, Host, Port, Reply) || Reply <- Responses], | |
{reply, {ping, <<>>}, Req, State#publish{timer = undefined, player = Player1}}; | |
websocket_info({udp, Sock, Host, Port, Packet}, Req, #publish{player = Player} = State) -> | |
{ok, Responses, Player1} = webrtc_player:handle_udp(Host, Port, Packet, Player), | |
inet:setopts(Sock, [{active,once}]), | |
[gen_udp:send(Sock, Host, Port, Reply) || Reply <- Responses], | |
{ok, Req, State#publish{player = Player1, peer_addr = Host, peer_port = Port}}; | |
websocket_info({dtls, flight, Data}, Req, #publish{socket = Sock, peer_addr = SrcHost, peer_port = SrcPort} = State) -> | |
ok = gen_udp:send(Sock, SrcHost, SrcPort, Data), | |
{ok, Req, State}; | |
websocket_info({dtls, key_material, SrtpParams}, Req, #publish{player = Player} = State) -> | |
{ok, Player1} = webrtc_player:handle_key_material(SrtpParams, Player), | |
{ok, Req, State#publish{player = Player1}}; | |
% websocket_info({udp, Sock, Host, Port, <<?RTP_VERSION:2, _:14, _:48, SSRC:32, _/binary>> = SRTP}, Req, | |
% #publish{srtp_params = #srtp_params{server_write_SRTP_master_key = SKey, server_write_SRTP_master_salt = SSalt}, srtp_dec = undefined} = State) -> | |
% Dec = srtp:new_ctx(SSRC, srtpEncryptionAESCM, srtpAuthenticationSha1Hmac, SKey, SSalt) | |
websocket_info(#media_info{} = MediaInfo, Req, #publish{player = undefined} = State) -> | |
initialize_with_mi(MediaInfo, Req, State); | |
websocket_info(#media_info{} = MI, Req, #publish{player = Player} = State) -> | |
OldOffer = webrtc_player:offer(Player), | |
{ok, Player1} = webrtc_player:handle_media_info(MI, Player), | |
NewOffer = webrtc_player:offer(Player1), | |
if OldOffer == NewOffer -> | |
{ok, Req, State#publish{player = Player1}}; | |
true -> | |
{reply, {text, jsx:encode(NewOffer)}, Req, State#publish{player = Player1}} | |
end; | |
websocket_info(#video_frame{} = Frame, Req, #publish{player = Player, socket = Socket, | |
peer_addr = Addr, peer_port = Port} = State) -> | |
case webrtc_player:handle_frame(Frame, Player) of | |
{ok, Player1} -> | |
{ok, Req, State#publish{player = Player1}}; | |
{ok, Packets, Player1} -> | |
[gen_udp:send(Socket, Addr, Port, Packet) || Packet <- Packets], | |
flu_session:add_bytes(State#publish.session_id, iolist_size(Packets)), | |
{ok, Req, State#publish{player = Player1}} | |
end; | |
websocket_info({timeout, _, {send_frame, _DTS}}, Req, #publish{player = Player, socket = Socket, | |
peer_addr = Addr, peer_port = Port} = State) -> | |
{ok, Packets, Player1} = webrtc_player:send_next_frame(Player), | |
[gen_udp:send(Socket, Addr, Port, Packet) || Packet <- Packets], | |
flu_session:add_bytes(State#publish.session_id, iolist_size(Packets)), | |
{ok, Req, State#publish{player = Player1}}; | |
websocket_info(refresh_auth, Req, State) -> | |
flu_session:inprocess_refresh(), | |
{ok, Req, State}; | |
websocket_info(Bin, Req, State) when is_binary(Bin) -> | |
{reply, {text, Bin}, Req, State}; | |
websocket_info(Message, Req, State) -> | |
events:warning("Unexpected message: ~P", [Message, 30]), | |
{ok, Req, State}. | |
websocket_handle({text, JSON}, Req, #publish{player = Player} = State) -> | |
Message = jsx:decode(JSON, [return_maps]), | |
case webrtc_player:handle_json(Message, Player) of | |
{ok, R, Player1} -> | |
{reply, {text, jsx:encode(R)}, Req, State#publish{player = Player1}}; | |
{ok, Player1} -> | |
{ok, Req, State#publish{player = Player1}} | |
end; | |
websocket_handle({pong,_}, Req, #publish{} = State) -> | |
Ref = erlang:send_after(?WS_TIMEOUT, self(), ping), | |
{ok, Req, State#publish{timer = Ref}}; | |
websocket_handle(_Data, Req, #publish{} = State) -> | |
events:info("Unknown ws message: ~p", [_Data]), | |
{ok, Req, State}. | |
websocket_terminate(_Reason, _Req, _State) -> | |
ok. | |
terminate(_,_,_) -> | |
ok. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment