Skip to content

Instantly share code, notes, and snippets.

@baines
Last active September 14, 2016 22:23
Show Gist options
  • Save baines/b960725baf8a60854258 to your computer and use it in GitHub Desktop.
Save baines/b960725baf8a60854258 to your computer and use it in GitHub Desktop.
3 C++11 threaded HLS downloader / player thingies
#if 0
g++ $0 -std=c++11 -pthread -lcurl -o hls-dl
exit
#endif
#include <thread>
#include <mutex>
#include <algorithm>
#include <vector>
#include <array>
#include <string>
#include <iostream>
#include <sstream>
#include <curl/curl.h>
#include <stdio.h>
#include <libgen.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <cassert>
#include <atomic>
static const char USER_AGENT[] = "crappy hls player";
using namespace std;
using namespace std::chrono;
template<class T, size_t N>
struct atomic_buf {
T& begin_read(){
assert(available());
return data[idx_start];
}
void end_read(const T& t){
size_t i = &t - &data[0];
if(idx_start.compare_exchange_strong(i, (i + 1) % data.size())){
data[i] = T();
}
}
T& write(const T& t){
size_t i = idx_end.load(), j = (i + 1) % data.size();
data[i] = t;
idx_end.store(j);
idx_start.compare_exchange_strong(j, (j + 1) % data.size());
return data[i];
}
int available(){
return idx_start <= idx_end ? idx_end - idx_start : N - (idx_start - idx_end);
}
atomic_size_t idx_start, idx_end;
array<T, N> data;
};
struct PendingSeg {
PendingSeg() = default;
PendingSeg(const string& s)
: url(s), data(), start_time(system_clock::now()), done(false){}
string url;
string data;
system_clock::time_point start_time;
bool done;
};
vector<string> old_urls;
atomic_buf<string, 4> new_urls;
atomic_buf<PendingSeg, 8> pending_segs;
atomic_buf<string, 8> segs;
mutex stdout_lock;
template<class T>
void print(const T& t){
unique_lock<mutex> l(stdout_lock);
cout << "== " << t << " ==\n";
}
template<class T>
void do_print(const T& t){
cout << t << " ==\n";
}
template<class T, class... Args>
void do_print(const T& t, Args... args){
cout << t;
do_print(args...);
}
template<class T, class... Args>
void print(const T& t, Args... args){
unique_lock<mutex> l(stdout_lock);
cout << "== " << t;
do_print(args...);
}
size_t seg_callback(char* ptr, size_t size, size_t nmemb, void* arg){
size_t sz = size * nmemb;
PendingSeg* seg = reinterpret_cast<PendingSeg*>(arg);
seg->data.append(ptr, ptr+sz);
return sz;
}
string m3u_str;
int new_url_count = 0;
size_t m3u_callback(char* ptr, size_t size, size_t nmemb, void* arg){
size_t sz = size * nmemb, old = 0;
m3u_str += string(ptr, sz);
istringstream stream(m3u_str);
string s;
while(getline(stream, s)){
if(stream.eof()){
print("EOF");
m3u_str = s;
return sz;
}
if(s[0] == '#') continue;
if(find(old_urls.begin(), old_urls.end(), s) == old_urls.end()){
print("Adding url: ", s);
new_urls.write(s);
old_urls.push_back(s);
++new_url_count;
}
}
m3u_str.clear();
if(old_urls.size() > 64){
print("Cleaning old urls.");
old_urls.erase(old_urls.begin(), old_urls.begin()+32);
}
return sz;
}
void m3u_thread_main(const char* m3u_url){
CURL* m3u = curl_easy_init();
curl_easy_setopt(m3u, CURLOPT_TCP_NODELAY, 1);
curl_easy_setopt(m3u, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(m3u, CURLOPT_FAILONERROR, 1);
curl_easy_setopt(m3u, CURLOPT_USERAGENT, USER_AGENT);
curl_easy_setopt(m3u, CURLOPT_WRITEFUNCTION, &m3u_callback);
curl_easy_setopt(m3u, CURLOPT_URL, m3u_url);
int empty_count = 0;
while(true){
auto start_time = system_clock::now();
curl_easy_perform(m3u);
if(new_url_count == 0){
print("No new segments.");
if(++empty_count > 5){
print("Nothing new after ", empty_count, " checks. Assuming stream's dead.");
exit(0);
}
} else {
empty_count = 0;
}
new_url_count = 0;
this_thread::sleep_until(start_time + seconds(2));
}
}
void dl_thread_main(const char* m3u_url){
string baseurl(dirname(strdup(m3u_url)));
bool first_iteration = true;
CURLM* curl_multi = curl_multi_init();
while(true){
auto start_time = system_clock::now();
while(new_urls.available()){
string s;
const string& url = new_urls.begin_read();
if(url.find('/') == string::npos){
s = baseurl + "/" + url;
} else {
s = url;
}
print("Adding Segment [", url, "] to batch.");
CURL* seg = curl_easy_init();
curl_easy_setopt(seg, CURLOPT_TCP_NODELAY, 1);
curl_easy_setopt(seg, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(seg, CURLOPT_FAILONERROR, 1);
curl_easy_setopt(seg, CURLOPT_USERAGENT, USER_AGENT);
curl_easy_setopt(seg, CURLOPT_WRITEFUNCTION, &seg_callback);
curl_easy_setopt(seg, CURLOPT_WRITEHEADER, stderr);
curl_easy_setopt(seg, CURLOPT_HEADERFUNCTION, &fwrite);
curl_easy_setopt(seg, CURLOPT_URL, s.c_str());
auto& p = pending_segs.write(url);
curl_easy_setopt(seg, CURLOPT_WRITEDATA, &p);
curl_easy_setopt(seg, CURLOPT_PRIVATE, &p);
curl_multi_add_handle(curl_multi, seg);
new_urls.end_read(url);
}
int running_handles;
curl_multi_perform(curl_multi, &running_handles);
int remaining_msgs;
while(CURLMsg* msg = curl_multi_info_read(curl_multi, &remaining_msgs)){
if(msg->msg != CURLMSG_DONE) continue;
PendingSeg* ptr;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &ptr);
ptr->done = true;
milliseconds dl_time = duration_cast<milliseconds>(
system_clock::now() - ptr->start_time
);
string url_copy = ptr->url;
print("Got Segment [", url_copy, "] in ", dl_time.count(), "ms. [", curl_easy_strerror(msg->data.result), "].");
curl_multi_remove_handle(curl_multi, msg->easy_handle);
curl_easy_cleanup(msg->easy_handle);
}
while(pending_segs.available()){
auto& i = pending_segs.begin_read();
if(i.done){
if(i.data.empty()){
print("WARNING: 0 bytes received.");
}
segs.write(i.data);
pending_segs.end_read(i);
} else {
break;
}
}
this_thread::sleep_until(start_time + milliseconds(25));
}
}
int main(int argc, char** argv){
if(argc < 2 || strcmp(argv[1], "") == 0){
fprintf(stderr, "%s: Error: No URL specified\n.", argv[0]);
return 1;
}
fprintf(stderr, "%s: m3u URL: %s\n", argv[0], argv[1]);
int err = 0;
const char* player = argc >= 3 ? argv[2] : "mpv -";
FILE* cfd = popen(player, "w");
setvbuf(cfd, NULL, _IONBF, 0);
if((err = fcntl(fileno(cfd), F_SETPIPE_SZ, 1024)) == -1){
perror(argv[0]);
return 1;
} else {
fprintf(stderr, "+-+-+-+ Pipe capacity: %d +-+-+-+\n", err);
}
int icfd = fileno(cfd);
curl_global_init(CURL_GLOBAL_ALL);
thread m3u_thread(m3u_thread_main, argv[1]);
thread dl_thread(dl_thread_main, argv[1]);
while(true){
while(segs.available()){
auto& i = segs.begin_read();
write(icfd, i.data(), i.size());
segs.end_read(i);
}
this_thread::sleep_for(milliseconds(50));
}
return 0;
}
#include <thread>
#include <mutex>
#include <algorithm>
#include <vector>
#include <array>
#include <list>
#include <string>
#include <iostream>
#include <sstream>
#include <curl/curl.h>
#include <stdio.h>
#include <libgen.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <condition_variable>
#include <cassert>
static const char USER_AGENT[] = "crappy hls player";
using namespace std;
using namespace std::chrono;
struct PendingSeg {
PendingSeg(const string& s)
: url(s), data(), start_time(system_clock::now()), done(false){}
string url;
vector<string> data;
system_clock::time_point start_time;
bool done;
};
vector<string> urls;
list<PendingSeg> pending_segs;
list<string> segs;
mutex m3u_lock;
mutex seg_lock;
mutex stdout_lock;
condition_variable m3u_cond;
condition_variable seg_cond;
template<class T>
void print(const T& t){
unique_lock<mutex> l(stdout_lock);
cout << "== " << t << " ==\n";
}
template<class T>
void do_print(const T& t){
cout << t << " ==\n";
}
template<class T, class... Args>
void do_print(const T& t, Args... args){
cout << t;
do_print(args...);
}
template<class T, class... Args>
void print(const T& t, Args... args){
unique_lock<mutex> l(stdout_lock);
cout << "== " << t;
do_print(args...);
}
size_t seg_callback(char* ptr, size_t size, size_t nmemb, void* arg){
size_t sz = size * nmemb;
PendingSeg* seg = reinterpret_cast<PendingSeg*>(arg);
if(seg == &pending_segs.front()){
seg_lock.lock();
segs.emplace_back(ptr, ptr+sz);
seg_lock.unlock();
} else {
seg->data.emplace_back(ptr, ptr+sz);
}
return sz;
}
string m3u_str;
int new_url_count = 0;
size_t m3u_callback(char* ptr, size_t size, size_t nmemb, void* arg){
size_t sz = size * nmemb, old = 0;
m3u_str += string(ptr, sz);
istringstream stream(m3u_str);
string s;
while(getline(stream, s)){
if(stream.eof()){
m3u_str = s;
return sz;
}
if(s[0] == '#') continue;
if(find(urls.begin(), urls.end(), s) == urls.end()){
print("Adding url: ", s);
urls.push_back(move(s));
++new_url_count;
}
}
m3u_str.clear();
return sz;
}
void m3u_thread_main(const char* m3u_url){
CURL* m3u = curl_easy_init();
curl_easy_setopt(m3u, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(m3u, CURLOPT_FAILONERROR, 1);
curl_easy_setopt(m3u, CURLOPT_USERAGENT, USER_AGENT);
curl_easy_setopt(m3u, CURLOPT_WRITEFUNCTION, &m3u_callback);
curl_easy_setopt(m3u, CURLOPT_URL, m3u_url);
while(true){
auto start_time = system_clock::now();
m3u_lock.lock();
curl_easy_perform(m3u);
//m3u_cond.notify_all();
m3u_lock.unlock();
this_thread::sleep_until(start_time + seconds(3));
}
}
void dl_thread_main(const char* m3u_url){
string baseurl(dirname(strdup(m3u_url)));
bool first_iteration = true;
CURLM* curl_multi = curl_multi_init();
while(true){
auto start_time = system_clock::now();
m3u_lock.lock();
if(new_url_count != 0){
if(first_iteration){
new_url_count = min(new_url_count, 3);
first_iteration = false;
}
for(int i = urls.size() - new_url_count; i < urls.size(); ++i){
string s;
if(urls[i].find('/') == string::npos){
s = baseurl + "/" + urls[i];
} else {
s = urls[i];
}
print("Adding Segment [", urls[i], "] to batch.");
CURL* seg = curl_easy_init();
curl_easy_setopt(seg, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(seg, CURLOPT_FAILONERROR, 1);
curl_easy_setopt(seg, CURLOPT_USERAGENT, USER_AGENT);
curl_easy_setopt(seg, CURLOPT_WRITEFUNCTION, &seg_callback);
curl_easy_setopt(seg, CURLOPT_WRITEHEADER, stderr);
curl_easy_setopt(seg, CURLOPT_HEADERFUNCTION, &fwrite);
curl_easy_setopt(seg, CURLOPT_URL, s.c_str());
seg_lock.lock();
pending_segs.emplace_back(urls[i]);
curl_easy_setopt(seg, CURLOPT_WRITEDATA, &pending_segs.back());
curl_easy_setopt(seg, CURLOPT_PRIVATE, &pending_segs.back());
seg_lock.unlock();
curl_multi_add_handle(curl_multi, seg);
}
new_url_count = 0;
}
m3u_lock.unlock();
int running_handles;
curl_multi_perform(curl_multi, &running_handles);
int remaining_msgs;
while(CURLMsg* msg = curl_multi_info_read(curl_multi, &remaining_msgs)){
if(msg->msg != CURLMSG_DONE) continue;
seg_lock.lock();
PendingSeg* ptr;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &ptr);
ptr->done = true;
if(ptr == &pending_segs.front()){
assert(ptr->data.empty());
}
string url = ptr->url;
milliseconds dl_time = duration_cast<milliseconds>(
system_clock::now() - ptr->start_time
);
auto i = pending_segs.begin(), j = pending_segs.end();
while(i != j){
if(i->done){
for(auto&& str : i->data){
segs.push_back(move(str));
}
++i;
pending_segs.pop_front();
} else {
break;
}
}
if(!pending_segs.empty()){
for(auto&& str : pending_segs.front().data){
segs.push_back(move(str));
}
pending_segs.front().data.clear();
}
seg_cond.notify_all();
seg_lock.unlock();
print("Got Segment [", url, "] in ", dl_time.count(), "ms.");
curl_multi_remove_handle(curl_multi, msg->easy_handle);
curl_easy_cleanup(msg->easy_handle);
}
this_thread::sleep_until(start_time + milliseconds(100));
}
}
int main(int argc, char** argv){
if(argc < 2 || strcmp(argv[1], "") == 0){
fprintf(stderr, "%s: Error: No URL specified\n.", argv[0]);
return 1;
}
fprintf(stderr, "%s: m3u URL: %s\n", argv[0], argv[1]);
int err = 0;
FILE* cfd = popen("gst-launch-0.10 -v fdsrc ! mpegtsdemux name=d ! queue ! ffdec_h264 ! autovideosink d. ! queue ! decodebin2 ! autoaudiosink", "w");
// FILE* cfd = popen("mplayer --no-border --quiet -", "w");
setvbuf(cfd, NULL, _IONBF, 0);
if((err = fcntl(fileno(cfd), F_SETPIPE_SZ, 4 * 1024)) == -1){
perror(argv[0]);
return 1;
} else {
fprintf(stderr, "+-+-+-+ Pipe capacity: %d +-+-+-+\n", err);
}
int icfd = fileno(cfd);
curl_global_init(CURL_GLOBAL_ALL);
thread m3u_thread(m3u_thread_main, argv[1]);
thread dl_thread(dl_thread_main, argv[1]);
while(true){
unique_lock<mutex> l(seg_lock);
if(segs.empty()){
seg_cond.wait(l, [&]{ return !segs.empty(); });
}
list<string>::const_iterator i = segs.begin(), j = segs.end();
size_t count = distance(i, j);
l.unlock();
while(count--){
write(icfd, i->data(), i->size());
l.lock();
++i;
segs.pop_front();
l.unlock();
}
}
return 0;
}
#include <thread>
#include <mutex>
#include <algorithm>
#include <vector>
#include <array>
#include <list>
#include <string>
#include <iostream>
#include <sstream>
#include <curl/curl.h>
#include <stdio.h>
#include <libgen.h>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <condition_variable>
using namespace std;
using namespace std::chrono;
array<string, 4096> urls;
int url_idx = 0;
list<vector<char>> segs;
mutex seg_lock;
mutex stdout_lock;
condition_variable cv;
template<class T>
void print(const T& t){
unique_lock<mutex> l(stdout_lock);
cout << "== " << t << " ==\n";
}
template<class T>
void do_print(const T& t){
cout << t << " ==\n";
}
template<class T, class... Args>
void do_print(const T& t, Args... args){
cout << t;
do_print(args...);
}
template<class T, class... Args>
void print(const T& t, Args... args){
unique_lock<mutex> l(stdout_lock);
cout << "== " << t;
do_print(args...);
}
size_t seg_callback(char* ptr, size_t size, size_t nmemb, void* arg){
size_t sz = size * nmemb;
*static_cast<bool*>(arg) = true;
seg_lock.lock();
segs.emplace_back(ptr, ptr+sz);
cv.notify_one();
seg_lock.unlock();
return sz;
}
string m3u_str;
size_t m3u_callback(char* ptr, size_t size, size_t nmemb, void* arg){
size_t sz = size * nmemb, old = 0;
int* count = static_cast<int*>(arg);
m3u_str += string(ptr, sz);
istringstream stream(m3u_str);
string s;
while(s.clear(), getline(stream, s)){
if(stream.eof()){
m3u_str.clear();
m3u_str += s;
break;
}
if(s[0] == '#') continue;
if(find(urls.begin(), urls.end(), s) == urls.end()){
print("Adding url: ", s);
urls[url_idx] = move(s);
url_idx = (url_idx + 1) % urls.size();
(*count)++;
}
}
return sz;
}
void dl_thread_main(const char* appname, const char* m3u_url){
int empty = 0, count = 0, j = -1;
bool playing = true;
string baseurl(dirname(strdup(m3u_url)));
CURL* m3u = curl_easy_init(), *seg;
curl_easy_setopt(m3u, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(m3u, CURLOPT_FAILONERROR, 1);
curl_easy_setopt(m3u, CURLOPT_USERAGENT, "crappy hls player");
seg = curl_easy_duphandle(m3u);
curl_easy_setopt(m3u, CURLOPT_WRITEFUNCTION, &m3u_callback);
curl_easy_setopt(m3u, CURLOPT_WRITEDATA, &count);
curl_easy_setopt(m3u, CURLOPT_URL, m3u_url);
curl_easy_setopt(seg, CURLOPT_WRITEFUNCTION, &seg_callback);
curl_easy_setopt(seg, CURLOPT_WRITEDATA, &playing);
curl_easy_setopt(seg, CURLOPT_WRITEHEADER, stderr);
curl_easy_setopt(seg, CURLOPT_HEADERFUNCTION, &fwrite);
while(true) {
count = 0;
auto tp = system_clock::now();
print("Getting playlist...");
curl_easy_perform(m3u);
if(count == 0){
if(playing && ++empty > 5){
print(empty, " 404's in a row; assuming stream is dead.");
exit(1);
}
print("No new segments.");
} else {
empty = 0;
int i = j < 0 ? max(url_idx - 3, 0): j;
j = url_idx;
for(; i != j; i = (i+1) % urls.size()){
/*if(system_clock::now() >= tp + seconds(4)){
j = i;
break;
}*/
string s;
if(urls[i].find('/') == string::npos){
s = baseurl + "/" + urls[i];
} else {
s = urls[i];
}
print("Getting Segment [", urls[i], "]");
auto seg_start = system_clock::now();
curl_easy_setopt(seg, CURLOPT_URL, s.c_str());
curl_easy_perform(seg);
print("Got Segment [", urls[i], "] in ", duration_cast<milliseconds>
(system_clock::now() - seg_start).count(), "ms.");
}
}
auto secs = playing ? seconds(4) : milliseconds(800);
this_thread::sleep_until(tp + secs);
}
}
int main(int argc, char** argv){
if(argc < 2 || strcmp(argv[1], "") == 0){
fprintf(stderr, "%s: Error: No URL specified\n.", argv[0]);
return 1;
}
fprintf(stderr, "%s: m3u URL: %s\n", argv[0], argv[1]);
int err = 0;
FILE* cfd = popen("gst-launch-0.10 -v fdsrc ! mpegtsdemux name=d ! queue ! ffdec_h264 ! autovideosink d. ! queue ! decodebin2 ! autoaudiosink", "w");
setvbuf(cfd, NULL, _IONBF, 0);
if((err = fcntl(fileno(cfd), F_SETPIPE_SZ, 64 * 1024)) == -1){
perror(argv[0]);
return 1;
} else {
fprintf(stderr, "+-+-+-+ Pipe capacity: %d +-+-+-+\n", err);
}
int icfd = fileno(cfd);
curl_global_init(CURL_GLOBAL_ALL);
thread dl_thread(dl_thread_main, argv[0], argv[1]);
while(true){
unique_lock<mutex> l(seg_lock);
if(segs.empty()){
cv.wait(l, [&]{ return !segs.empty(); });
}
list<vector<char>>::const_iterator i = segs.begin(), j = segs.end();
l.unlock();
while(i != j){
write(icfd, i->data(), i->size());
++i;
l.lock();
segs.pop_front();
l.unlock();
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment