Mix.install([
{:plug_cowboy, "~> 2.6"}
])
Welcome to the world of television broadcasting! In this demo, we'll be building a high-quality live streaming server for a brand new TV station that's just starting up. The station, called 'ElixirTV', is a cutting-edge network that aims to revolutionize the way we consume TV content.
The founders of ElixirTV are a group of passionate tech enthusiasts who believe that the future of TV is in live streaming. They want to create a platform that allows viewers to watch their favorite shows and events, live and in real-time, from anywhere in the world. But they need your help to make it happen!
As the lead developer of ElixirTV, you've been tasked with building the server that will power the network's live streaming capabilities. Your challenge is to create a robust, scalable solution that can handle a large number of simultaneous viewers, without sacrificing video quality or performance. Requirements
- We will need to generate a live stream based on a set of regular mp4 files
- We will append new videos or override the video list on the fly.
- Loop through the videos to form an endless stream of videos.
Here are the main components of this service
- Transcoder converts regular videos into VOD HLS format. Powered by FFmpeg.
- Scheduler rotate HLS segments in a circular queue.
- HLS Server for serving the HLS ready content over HTTP.
graph LR
scheduler(Scheduler)
server(Server)
live(Live Stream)
video(Video Files)
transcoder(Transcoder)
video --> transcoder
transcoder --> scheduler
scheduler --> server
server --> live
- Input: mp4 files
- Output: HLS files (x3 resolutions)
- Playlists
playlist.m3u8
- Segments
segment_xxx.ts
- Playlists
graph LR
sv(MP4 Video)
transcode[Transcode]
scale[Scale]
sv --> transcode
transcode --> scale
scale --> 720P
scale --> 1080P
scale --> 2160P
720P --> hls_v0
1080P --> hls_v1
2160P --> hls_v2
FFmpeg is a powerful open source video processing tool. https://ffmpeg.org/
defmodule Transcoder do
@variants [
%{
name: "0",
resolution: "1280x720",
bitrate: 2800,
h264_profile: "main",
encoding_level: "3.1",
audio_bitrate: 128
},
%{
name: "1",
resolution: "1920x1080",
bitrate: 5000,
h264_profile: "high",
encoding_level: "4.0",
audio_bitrate: 320
},
%{
name: "2",
resolution: "3840x2160",
bitrate: 14000,
h264_profile: "high10",
encoding_level: "5.1",
audio_bitrate: 640
}
]
def transcode_videos(video_paths) when is_list(video_paths) do
Enum.map(video_paths, &transcode_video/1)
end
def transcode_video(video_path) when is_binary(video_path) do
video_path
|> assign_video_id()
|> determine_command(video_path)
|> execute()
end
defp assign_video_id(video_filename) do
ext_name = Path.extname(video_filename)
file_basename = Path.basename(video_filename, ext_name)
ts_suffix = DateTime.utc_now() |> DateTime.to_unix()
"#{file_basename}-#{ts_suffix}"
end
defp determine_command(video_id, video_path) do
variant_opts = determine_variant_opts()
output_dir = "hls_out/#{video_id}"
File.mkdir!(output_dir)
cmd = """
ffmpeg -i #{video_path} \
#{variant_opts} \
-c:a aac -b:a 128k -ac 2 \
-map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 \
-f hls -hls_time 6 -hls_segment_filename "#{output_dir}/%v/segment_%03d.ts" \
-hls_playlist_type vod -hls_flags independent_segments \
-var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" \
-master_pl_name #{output_dir}/master.m3u8 \
#{output_dir}/%v/playlist.m3u8
"""
%{command: cmd, video_id: video_id, video_path: video_path, output_dir: output_dir}
end
defp determine_variant_opts do
@variants
|> Enum.map(fn variant ->
[width, height] = String.split(variant.resolution, "x")
[
["-filter:v", "scale=w=#{width}:h=#{height}:force_original_aspect_ratio=decrease"],
["-c:v:#{variant.name}", "libx264"],
["-b:v:#{variant.name}", "#{variant.bitrate}k"],
["-s:v:#{variant.name}", variant.resolution],
["-preset", "medium"],
["-profile:v:#{variant.name}", variant.h264_profile],
["-level:v:#{variant.name}", variant.encoding_level],
# to force the segment length to 6 seconds
["-force_key_frames", "\"expr:gte(t,n_forced*6)\""],
["-sc_threshold", "0"]
]
|> List.flatten()
|> Enum.join(" ")
end)
|> Enum.concat([""])
|> Enum.join(" \\\n")
end
defp execute(%{command: cmd, video_id: video_id, video_path: video_path, output_dir: output_dir}) do
case System.cmd("sh", ["-c", cmd], stderr_to_stdout: true) do
{output, 0} ->
IO.puts("Transcoding for #{video_path} completed successfully.")
{:ok,
%{
video_id: video_id,
video_path: video_path,
command: cmd,
video_output_dir: output_dir,
command_output: output
}}
{error_output, error_code} ->
IO.puts("Transcoding for #{video_path} failed with error code: #{error_code}.")
IO.puts("Error output: #{error_output}")
{:error, %{video_path: video_path, error: error_output}}
end
end
end
ffmpeg -i ${NAME}.mp4 \
-filter:v scale=w=640:h=360:force_original_aspect_ratio=decrease \
-c:v:0 libx264 -b:v:0 800k -s:v:0 640x360 -preset medium -profile:v:0 baseline \
-level:v:0 3.0 -force_key_frames "expr:gte(t,n_forced*6)" -sc_threshold 0 \
-filter:v scale=w=1280:h=720:force_original_aspect_ratio=decrease \
-c:v:1 libx264 -b:v:1 2800k -s:v:1 1280x720 -preset medium -profile:v:1 main \
-level:v:1 3.1 -force_key_frames "expr:gte(t,n_forced*6)" -sc_threshold 0 \
-filter:v scale=w=1920:h=1080:force_original_aspect_ratio=decrease \
-c:v:2 libx264 -b:v:2 5000k -s:v:2 1920x1080 -preset medium -profile:v:2 high \
-level:v:2 4.0 -force_key_frames "expr:gte(t,n_forced*6)" -sc_threshold 0 \
-c:a aac -b:a 128k -ac 2 -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 \
-f hls -hls_time 6 -hls_playlist_type vod -hls_flags independent_segments \
-hls_segment_filename "hls_out/${VIDEO_ID}/%v/segment_%03d.ts" \
-var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" -master_pl_name hls_out/${VIDEO_ID}/master.m3u8 \
hls_out/${VIDEO_ID}/%v/playlist.m3u8
Let's define some data structures that we'll use for passing data between modules.
defmodule Segment do
defstruct [:video_id, :variant, :filename, :duration_ms, :first?]
end
defmodule Video do
defstruct [:id, :segments, :location]
end
graph LR;
segments(List of Segments)
playlist(HLS Playlist)
segments --encode--> playlist
playlist --decode--> segments
defmodule Playlist do
@moduledoc """
Encode/decode HLS playlists
"""
@headers [
"#EXTM3U",
"#EXT-X-VERSION:3",
"#EXT-X-TARGETDURATION:6"
]
@doc """
Takes a `Video` and a variant name
Return a list of segments for that variant
"""
def parse_segments(%{location: video_dir, id: video_id}, variant) do
Path.join([video_dir, variant, "playlist.m3u8"])
|> File.stream!()
|> Stream.filter(&segment_line?/1)
|> Stream.chunk_every(2)
|> Stream.with_index()
|> Enum.map(fn {["#EXTINF:" <> duration, filename], index} ->
{duration_s, _} = Float.parse(duration)
%Segment{
video_id: video_id,
variant: variant,
filename: String.trim(filename),
duration_ms: duration_s * 1000,
# mark the first segment
first?: index == 0
}
end)
end
defp segment_line?(line) do
!String.starts_with?(line, "#EXT") || String.starts_with?(line, "#EXTINF")
end
@doc """
Takes a list of segments and a starting media_sequence_number
Returns a media playlist with those segments
"""
def generate({segments, media_sequence_number}) do
msq_line = "#EXT-X-MEDIA-SEQUENCE:#{media_sequence_number}"
segment_lines = Enum.flat_map(segments, &generate_segment_lines/1)
playlist = [@headers, [msq_line], segment_lines] |> Enum.concat() |> Enum.join("\n")
{:ok, playlist}
end
defp generate_segment_lines(%{first?: true} = segment) do
[
"#EXT-X-DISCONTINUITY",
"#EXTINF:#{segment.duration_ms / 1000}",
"#{segment.video_id}/#{segment.filename}"
]
end
defp generate_segment_lines(segment) do
[
"#EXTINF:#{segment.duration_ms / 1000}",
"#{segment.video_id}/#{segment.filename}"
]
end
end
Core module for maintaining live stream state.
- Importing videos to the program queue
Scheduler.load(videos)
- Retrieving the current live playlist
Scheduler.get_playlist(variant)
graph LR
scheduler(Scheduler)
server(Server)
player(Player)
hls1 --> scheduler
hls2 --> scheduler
hls3 --> scheduler
scheduler --> server
server --> player
%{
"variant_name" => %{
segment_buffer: ["segment_1", "segment_2", "segment_3"],
current_buffer_start_ts: 1684983730, # timestamp - head of the buffer queue
current_video_index: 0, # current cursor position
current_global_sequence_number: 0, # value used in media playlist header
videos: [
# videos/segments
]
}
}
defmodule Scheduler do
@moduledoc """
Handles the server state
"""
use GenServer
@variants ["0", "1", "2"]
@min_buffer_size 15
@playlist_segment_count 10
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get_playlist(variant) when variant in @variants do
GenServer.call(__MODULE__, {:get_current_segments, variant, @playlist_segment_count})
|> Playlist.generate()
end
def load(videos, opts \\ []) do
GenServer.call(__MODULE__, {:load, videos, opts})
end
# Server
def init(_) do
schedule_next()
init_state =
for v <- @variants, into: %{} do
{v,
%{
segment_buffer: [],
current_buffer_start_ts: now(),
program_cursor: 0,
current_global_sequence_number: 0,
program_queue: []
}}
end
{:ok, init_state}
end
def handle_info(:rotate, state) do
current_ts = now()
new_state =
state
|> Task.async_stream(&rotate_variant_buffer(&1, current_ts))
|> Stream.map(fn {:ok, x} -> x end)
|> Enum.into(%{})
schedule_next()
{:noreply, new_state}
end
def handle_call({:load, videos, opts}, _from, state) do
mode = Keyword.get(opts, :mode, :replace)
new_state =
state
|> Task.async_stream(&load_variant(&1, videos, mode))
|> Stream.map(fn {:ok, x} -> x end)
|> Enum.into(%{})
{:reply, :ok, new_state}
end
def handle_call({:get_current_segments, variant, count}, _from, state) do
sub_state = state[variant]
segments = Enum.slice(sub_state.segment_buffer, 0, count)
media_sequence_number = sub_state.current_global_sequence_number
{:reply, {segments, media_sequence_number}, state}
end
def schedule_next do
Process.send_after(self(), :rotate, 1000)
end
# Helpers
def now do
DateTime.utc_now() |> DateTime.to_unix(:millisecond)
end
defp rotate_variant_buffer({v, sub_state}, current_ts) do
{popped, rest} =
pop_expired_segments(
sub_state.segment_buffer,
current_ts,
sub_state.current_buffer_start_ts
)
{v,
sub_state
|> Map.put(:segment_buffer, rest)
|> update_sequence_number(popped)
|> update_buffer_start_ts(popped)
|> maybe_load_next_program()}
end
defp pop_expired_segments([], _, _) do
{[], []}
end
defp pop_expired_segments([segment | rest] = segments, now, start_ts) do
end_ts = start_ts + segment.duration_ms
if end_ts < now do
{popped, rest} = pop_expired_segments(rest, now, end_ts)
{[segment | popped], rest}
else
{[], segments}
end
end
defp maybe_load_next_program(
%{program_queue: program_queue, segment_buffer: buffer} = sub_state
)
when length(buffer) <= @min_buffer_size and length(program_queue) > 0 do
next_cursor =
rem(
sub_state.program_cursor + 1,
length(sub_state.program_queue)
)
next_program = Enum.at(sub_state.program_queue, next_cursor)
%{
sub_state
| segment_buffer: Enum.concat(buffer, next_program),
program_cursor: next_cursor
}
end
defp maybe_load_next_program(sub_state), do: sub_state
defp update_sequence_number(sub_state, popped) do
# each popped segment will increase current_global_sequence by 1
new_sequence_number = sub_state.current_global_sequence_number + Enum.count(popped)
%{sub_state | current_global_sequence_number: new_sequence_number}
end
defp update_buffer_start_ts(sub_state, popped) do
time_shift_ms = popped |> Enum.map(& &1.duration_ms) |> Enum.sum()
new_buffer_start_ts = sub_state.current_buffer_start_ts + time_shift_ms
%{sub_state | current_buffer_start_ts: new_buffer_start_ts}
end
defp load_variant({v, sub_state}, videos, :replace) do
new_videos = Enum.map(videos, &Playlist.parse_segments(&1, v))
{v, %{sub_state | program_queue: new_videos, program_cursor: -1}}
end
defp load_variant({v, sub_state}, videos, :append) do
new_videos = Enum.map(videos, &Playlist.parse_segments(&1, v))
{v, %{sub_state | program_queue: Enum.concat(sub_state.program_queue, new_videos)}}
end
end
GET /playlist.m3u8
=> static master playlistGET /:variant/playlist.m3u8
=> live variant playlistGET /:variant/:video_id/:media_segment_name.ts
=> serves video files from local file system
defmodule Server do
use Plug.Router
use Plug.ErrorHandler
plug(Plug.Logger)
plug(:match)
plug(:dispatch)
get "/playlist.m3u8" do
send_resp(conn, 200, """
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=2960000,RESOLUTION=1280x720
0/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=5360000,RESOLUTION=1920x1080
1/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=13000000,RESOLUTION=3840x2160
2/playlist.m3u8
""")
end
get "/:variant/playlist.m3u8" do
{:ok, playlist} = Scheduler.get_playlist(variant)
conn
|> put_resp_header("content-type", "application/vnd.apple.mpegurl")
|> send_resp(200, playlist)
end
get "/:variant/:video_id/:segment_id.ts" do
path = "./hls_out/#{video_id}/#{variant}/#{segment_id}.ts"
if File.exists?(path) do
conn
|> put_resp_header("content-type", "video/MP2T")
|> send_file(200, path)
else
conn |> send_resp(404, "Not Found")
end
end
end
Make sure you have ffmpeg
installed and is available from your system shell.
Start a new iex session in terminal. Note: the directory where you run that iex session here will be used as working directory of this server.
iex --sname test --cookie mycookie
Connect this livebook session to that running iex session as attached node.
Put the mp4
files under the working directory. And run the snippets below.
defmodule MediaLoader do
@moduledoc """
Wraps the transcoding and shceduling steps into higher level API functions.
"""
def load_files(video_files, opts \\ []) do
video_files
|> Transcoder.transcode_videos()
|> prepare_load_files()
|> Scheduler.load(opts)
end
defp prepare_load_files(transcoding_results) do
for {:ok, %{video_id: video_id, video_output_dir: video_dir}} <- transcoding_results do
%Video{id: video_id, location: video_dir}
end
end
def load_videos(video_ids, opts \\ []) do
video_ids
|> prepare_load_videos()
|> Scheduler.load(opts)
end
defp prepare_load_videos(video_ids) do
for video_id <- video_ids do
%Video{
id: video_id,
location: "hls_out/#{video_id}"
}
end
end
end
defmodule Demo do
def start() do
children = [
{Plug.Cowboy, scheme: :http, plug: Server, port: 4000},
Scheduler
]
Supervisor.start_link(children, strategy: :one_for_one, name: Server.Supervisor)
end
end
Demo.start()
# transcode and load from video files
video_files = ~w[
video_0.mp4
video_1.mp4
video_2.mp4
video_3.mp4
]
MediaLoader.load_files(video_files, mode: :replace)
# video_ids = ~w[
# video_0-1685148754
# video_1-1685148761
# video_2-1685148768
# video_3-1685148776
# ]
# MediaLoader.load_videos(video_ids, mode: :replace)
Run the elixir snippet below to load video files into the server. You can choose between different loading modes which will have different effects on the queue.
Visit http://localhost:4000/playlist.m3u8
# Load video by id
video_ids = ~w[
lg-uhd-paris-1684820748
lg-uhd-walking-in-the-air-saint-petersburg-1684820902
]
MediaLoader.load_videos(video_ids, mode: :replace)
# transcode and load from video files
video_files = ~w[
video_0.mp4
video_1.mp4
video_2.mp4
video_3.mp4
]
MediaLoader.load_files(video_files, mode: :replace)
# # For terminating the server
# pid = Process.whereis(Server.Supervisor)
# Process.exit(pid, :normal)