Last active
December 27, 2017 03:55
-
-
Save 303248153/b29d49c72236a652f7deda4e170f00f1 to your computer and use it in GitHub Desktop.
Even shitter than etw, worse than the worst api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// sudo apt-get install liblttng-ctl-dev | |
// g++ -Wall -Wextra --std=c++11 CaptureLttngEventFromCoreClr.cpp -llttng-ctl && ./a.out | |
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | |
// ! ! | |
// ! Don't copy this code to your project, ! | |
// ! it contains GPL licensed code from lttng-tools (lttng-viewer-abi.h).! | |
// ! ! | |
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | |
#include <lttng/lttng.h> | |
#include <sys/socket.h> | |
#include <arpa/inet.h> | |
#include <signal.h> | |
#include <unistd.h> | |
#include <cstring> | |
#include <iostream> | |
#include <thread> | |
#include <chrono> | |
#include <vector> | |
#include <unordered_map> | |
#include <algorithm> | |
#include <functional> | |
namespace { | |
const char* SessionName = "example-session"; | |
const char* SessionUrl = "net://127.0.0.1"; | |
const std::string EventName = "DotNETRuntime:EventSource"; | |
const unsigned int LiveSessionInterval = 1000000; // 1s | |
bool LoopFlag = true; | |
void sigint_handler(int) { | |
LoopFlag = false; | |
} | |
#define LTTNG_VIEWER_PATH_MAX 4096 | |
#define LTTNG_VIEWER_NAME_MAX 255 | |
#define LTTNG_VIEWER_HOST_NAME_MAX 64 | |
/* Flags in reply to get_next_index and get_packet. */ | |
enum { | |
/* New metadata is required to read this packet. */ | |
LTTNG_VIEWER_FLAG_NEW_METADATA = (1 << 0), | |
/* New stream got added to the trace. */ | |
LTTNG_VIEWER_FLAG_NEW_STREAM = (1 << 1), | |
}; | |
enum lttng_viewer_command { | |
LTTNG_VIEWER_CONNECT = 1, | |
LTTNG_VIEWER_LIST_SESSIONS = 2, | |
LTTNG_VIEWER_ATTACH_SESSION = 3, | |
LTTNG_VIEWER_GET_NEXT_INDEX = 4, | |
LTTNG_VIEWER_GET_PACKET = 5, | |
LTTNG_VIEWER_GET_METADATA = 6, | |
LTTNG_VIEWER_GET_NEW_STREAMS = 7, | |
LTTNG_VIEWER_CREATE_SESSION = 8, | |
LTTNG_VIEWER_DETACH_SESSION = 9, | |
}; | |
enum lttng_viewer_attach_return_code { | |
LTTNG_VIEWER_ATTACH_OK = 1, /* The attach command succeeded. */ | |
LTTNG_VIEWER_ATTACH_ALREADY = 2, /* A viewer is already attached. */ | |
LTTNG_VIEWER_ATTACH_UNK = 3, /* The session ID is unknown. */ | |
LTTNG_VIEWER_ATTACH_NOT_LIVE = 4, /* The session is not live. */ | |
LTTNG_VIEWER_ATTACH_SEEK_ERR = 5, /* Seek error. */ | |
LTTNG_VIEWER_ATTACH_NO_SESSION = 6, /* No viewer session created. */ | |
}; | |
enum lttng_viewer_next_index_return_code { | |
LTTNG_VIEWER_INDEX_OK = 1, /* Index is available. */ | |
LTTNG_VIEWER_INDEX_RETRY = 2, /* Index not yet available. */ | |
LTTNG_VIEWER_INDEX_HUP = 3, /* Index closed (trace destroyed). */ | |
LTTNG_VIEWER_INDEX_ERR = 4, /* Unknow error. */ | |
LTTNG_VIEWER_INDEX_INACTIVE = 5, /* Inactive stream beacon. */ | |
LTTNG_VIEWER_INDEX_EOF = 6, /* End of index file. */ | |
}; | |
enum lttng_viewer_get_packet_return_code { | |
LTTNG_VIEWER_GET_PACKET_OK = 1, | |
LTTNG_VIEWER_GET_PACKET_RETRY = 2, | |
LTTNG_VIEWER_GET_PACKET_ERR = 3, | |
LTTNG_VIEWER_GET_PACKET_EOF = 4, | |
}; | |
enum lttng_viewer_get_metadata_return_code { | |
LTTNG_VIEWER_METADATA_OK = 1, | |
LTTNG_VIEWER_NO_NEW_METADATA = 2, | |
LTTNG_VIEWER_METADATA_ERR = 3, | |
}; | |
enum lttng_viewer_connection_type { | |
LTTNG_VIEWER_CLIENT_COMMAND = 1, | |
LTTNG_VIEWER_CLIENT_NOTIFICATION = 2, | |
}; | |
enum lttng_viewer_seek { | |
/* Receive the trace packets from the beginning. */ | |
LTTNG_VIEWER_SEEK_BEGINNING = 1, | |
/* Receive the trace packets from now. */ | |
LTTNG_VIEWER_SEEK_LAST = 2, | |
}; | |
enum lttng_viewer_new_streams_return_code { | |
LTTNG_VIEWER_NEW_STREAMS_OK = 1, /* If new streams are being sent. */ | |
LTTNG_VIEWER_NEW_STREAMS_NO_NEW = 2, /* If no new streams are available. */ | |
LTTNG_VIEWER_NEW_STREAMS_ERR = 3, /* Error. */ | |
LTTNG_VIEWER_NEW_STREAMS_HUP = 4, /* Session closed. */ | |
}; | |
enum lttng_viewer_create_session_return_code { | |
LTTNG_VIEWER_CREATE_SESSION_OK = 1, | |
LTTNG_VIEWER_CREATE_SESSION_ERR = 2, | |
}; | |
enum lttng_viewer_detach_session_return_code { | |
LTTNG_VIEWER_DETACH_SESSION_OK = 1, | |
LTTNG_VIEWER_DETACH_SESSION_UNK = 2, | |
LTTNG_VIEWER_DETACH_SESSION_ERR = 3, | |
}; | |
struct lttng_viewer_session { | |
uint64_t id; | |
uint32_t live_timer; | |
uint32_t clients; | |
uint32_t streams; | |
char hostname[LTTNG_VIEWER_HOST_NAME_MAX]; | |
char session_name[LTTNG_VIEWER_NAME_MAX]; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_stream { | |
uint64_t id; | |
uint64_t ctf_trace_id; | |
uint32_t metadata_flag; | |
char path_name[LTTNG_VIEWER_PATH_MAX]; | |
char channel_name[LTTNG_VIEWER_NAME_MAX]; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_cmd { | |
uint64_t data_size; /* data size following this header */ | |
uint32_t cmd; /* enum lttcomm_relayd_command */ | |
uint32_t cmd_version; /* command version */ | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_CONNECT payload. | |
*/ | |
struct lttng_viewer_connect { | |
/* session ID assigned by the relay for command connections */ | |
uint64_t viewer_session_id; | |
uint32_t major; | |
uint32_t minor; | |
uint32_t type; /* enum lttng_viewer_connection_type */ | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_LIST_SESSIONS payload. | |
*/ | |
struct lttng_viewer_list_sessions { | |
uint32_t sessions_count; | |
char session_list[]; /* struct lttng_viewer_session */ | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_ATTACH_SESSION payload. | |
*/ | |
struct lttng_viewer_attach_session_request { | |
uint64_t session_id; | |
uint64_t offset; /* unused for now */ | |
uint32_t seek; /* enum lttng_viewer_seek */ | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_attach_session_response { | |
/* enum lttng_viewer_attach_return_code */ | |
uint32_t status; | |
uint32_t streams_count; | |
/* struct lttng_viewer_stream */ | |
char stream_list[]; | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_GET_NEXT_INDEX payload. | |
*/ | |
struct lttng_viewer_get_next_index { | |
uint64_t stream_id; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_index { | |
uint64_t offset; | |
uint64_t packet_size; | |
uint64_t content_size; | |
uint64_t timestamp_begin; | |
uint64_t timestamp_end; | |
uint64_t events_discarded; | |
uint64_t stream_id; | |
uint32_t status; /* enum lttng_viewer_next_index_return_code */ | |
uint32_t flags; /* LTTNG_VIEWER_FLAG_* */ | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_GET_PACKET payload. | |
*/ | |
struct lttng_viewer_get_packet { | |
uint64_t stream_id; | |
uint64_t offset; | |
uint32_t len; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_trace_packet { | |
uint32_t status; /* enum lttng_viewer_get_packet_return_code */ | |
uint32_t len; | |
uint32_t flags; /* LTTNG_VIEWER_FLAG_* */ | |
char data[]; | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_GET_METADATA payload. | |
*/ | |
struct lttng_viewer_get_metadata { | |
uint64_t stream_id; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_metadata_packet { | |
uint64_t len; | |
uint32_t status; /* enum lttng_viewer_get_metadata_return_code */ | |
char data[]; | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_GET_NEW_STREAMS payload. | |
*/ | |
struct lttng_viewer_new_streams_request { | |
uint64_t session_id; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_new_streams_response { | |
/* enum lttng_viewer_new_streams_return_code */ | |
uint32_t status; | |
uint32_t streams_count; | |
/* struct lttng_viewer_stream */ | |
char stream_list[]; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_create_session_response { | |
/* enum lttng_viewer_create_session_return_code */ | |
uint32_t status; | |
} __attribute__ ((__packed__)); | |
/* | |
* LTTNG_VIEWER_DETACH_SESSION payload. | |
*/ | |
struct lttng_viewer_detach_session_request { | |
uint64_t session_id; | |
} __attribute__ ((__packed__)); | |
struct lttng_viewer_detach_session_response { | |
/* enum lttng_viewer_detach_session_return_code */ | |
uint32_t status; | |
} __attribute__ ((__packed__)); | |
int sendall(int fd, const void* buf, std::size_t size) { | |
std::size_t pos = 0; | |
while (pos < size) { | |
auto ret = send(fd, | |
reinterpret_cast<const char*>(buf) + pos, size - pos, 0); | |
if (ret <= 0) { | |
return -1; | |
} | |
pos += static_cast<std::size_t>(ret); | |
} | |
return 0; | |
} | |
int recvall(int fd, void* buf, std::size_t size) { | |
std::size_t pos = 0; | |
while (pos < size) { | |
auto ret = recv(fd, | |
reinterpret_cast<char*>(buf) + pos, size - pos, 0); | |
if (ret <= 0) { | |
return -1; | |
} | |
pos += static_cast<std::size_t>(ret); | |
} | |
return 0; | |
} | |
template <class T> | |
int sendcmd(int fd, std::uint32_t type, const T& body) { | |
lttng_viewer_cmd cmd = {}; | |
cmd.data_size = htobe64(sizeof(T)); | |
cmd.cmd = htobe32(type); | |
if (sendall(fd, &cmd, sizeof(cmd)) < 0) { | |
return -1; | |
} | |
if (sendall(fd, &body, sizeof(body)) < 0) { | |
return -1; | |
} | |
return 0; | |
} | |
int sendcmd(int fd, std::uint32_t type) { | |
lttng_viewer_cmd cmd = {}; | |
cmd.data_size = htobe64(0); | |
cmd.cmd = htobe32(type); | |
if (sendall(fd, &cmd, sizeof(cmd)) < 0) { | |
return -1; | |
} | |
return 0; | |
} | |
int relayd_connect(int fd, std::uint64_t& viewer_session_id) { | |
lttng_viewer_connect body = {}; | |
body.major = htobe32(2); | |
body.minor = htobe32(9); | |
body.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); | |
if (sendcmd(fd, LTTNG_VIEWER_CONNECT, body) < 0) { | |
return -1; | |
} | |
if (recvall(fd, &body, sizeof(body)) < 0) { | |
return -1; | |
} | |
viewer_session_id = be64toh(body.viewer_session_id); | |
return 0; | |
} | |
int relayd_create_viewer_session(int fd) { | |
if (sendcmd(fd, LTTNG_VIEWER_CREATE_SESSION) < 0) { | |
return -1; | |
} | |
lttng_viewer_create_session_response response = {}; | |
if (recvall(fd, &response, sizeof(response)) < 0) { | |
return -1; | |
} | |
if (be32toh(response.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { | |
return -2; | |
} | |
return 0; | |
} | |
int relayd_list_sessions(int fd, std::vector<lttng_viewer_session>& sessions) { | |
if (sendcmd(fd, LTTNG_VIEWER_LIST_SESSIONS) < 0) { | |
return -1; | |
} | |
lttng_viewer_list_sessions response = {}; | |
if (recvall(fd, &response, sizeof(response)) < 0) { | |
return -1; | |
} | |
auto session_count = be32toh(response.sessions_count); | |
sessions.resize(session_count); | |
if (recvall(fd, sessions.data(), sessions.size() * sizeof(sessions[0])) < 0) { | |
return -1; | |
} | |
for (auto& session : sessions) { | |
session.id = be64toh(session.id); | |
session.live_timer = be32toh(session.live_timer); | |
session.clients = be32toh(session.clients); | |
session.streams = be32toh(session.streams); | |
} | |
return 0; | |
} | |
int relayd_attach_session( | |
int fd, | |
std::uint64_t session_id, | |
std::unordered_map<std::uint64_t, lttng_viewer_stream>& streams) { | |
lttng_viewer_attach_session_request request = {}; | |
request.session_id = htobe64(session_id); | |
request.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); | |
if (sendcmd(fd, LTTNG_VIEWER_ATTACH_SESSION, request) < 0) { | |
return -1; | |
} | |
lttng_viewer_attach_session_response response = {}; | |
if (recvall(fd, &response, sizeof(response)) < 0) { | |
return -1; | |
} | |
response.status = be32toh(response.status); | |
if (response.status != LTTNG_VIEWER_ATTACH_OK) { | |
std::cerr << "attach status: " << response.status << std::endl; | |
return -2; | |
} | |
lttng_viewer_stream stream = {}; | |
auto stream_count = be32toh(response.streams_count); | |
for (std::size_t x = 0; x < stream_count; ++x) { | |
if (recvall(fd, &stream, sizeof(stream)) < 0) { | |
return -1; | |
} | |
stream.id = be64toh(stream.id); | |
stream.ctf_trace_id = be64toh(stream.ctf_trace_id); | |
stream.metadata_flag = be32toh(stream.metadata_flag); | |
streams[stream.id] = stream; | |
} | |
return 0; | |
} | |
int relayd_get_new_streams( | |
int fd, | |
std::uint64_t session_id, | |
std::unordered_map<std::uint64_t, lttng_viewer_stream>& streams, | |
const std::function<void(lttng_viewer_stream&)>& callback) { | |
lttng_viewer_new_streams_request request = {}; | |
request.session_id = htobe64(session_id); | |
if (sendcmd(fd, LTTNG_VIEWER_GET_NEW_STREAMS, request) < 0) { | |
return -1; | |
} | |
lttng_viewer_new_streams_response response = {}; | |
if (recvall(fd, &response, sizeof(response)) < 0) { | |
return -1; | |
} | |
auto status = be32toh(response.status); | |
if (status == LTTNG_VIEWER_NEW_STREAMS_NO_NEW) { | |
return 0; | |
} | |
if (status != LTTNG_VIEWER_NEW_STREAMS_OK) { | |
return -1; | |
} | |
lttng_viewer_stream stream = {}; | |
auto stream_count = be32toh(response.streams_count); | |
for (std::size_t x = 0; x < stream_count; ++x) { | |
if (recvall(fd, &stream, sizeof(stream)) < 0) { | |
return -1; | |
} | |
stream.id = be64toh(stream.id); | |
stream.ctf_trace_id = be64toh(stream.ctf_trace_id); | |
stream.metadata_flag = be32toh(stream.metadata_flag); | |
streams[stream.id] = stream; | |
callback(stream); | |
} | |
return 0; | |
} | |
int relayd_get_next_index(int fd, std::uint64_t stream_id, lttng_viewer_index& index) { | |
lttng_viewer_get_next_index request = {}; | |
request.stream_id = htobe64(stream_id); | |
if (sendcmd(fd, LTTNG_VIEWER_GET_NEXT_INDEX, request) < 0) { | |
return -1; | |
} | |
if (recvall(fd, &index, sizeof(index)) < 0) { | |
return -1; | |
} | |
index.offset = be64toh(index.offset); | |
index.packet_size = be64toh(index.packet_size); | |
index.content_size = be64toh(index.content_size); | |
index.timestamp_begin = be64toh(index.timestamp_begin); | |
index.timestamp_end = be64toh(index.timestamp_end); | |
index.events_discarded = be64toh(index.events_discarded); | |
index.stream_id = be64toh(index.stream_id); | |
index.status = be32toh(index.status); | |
index.flags = be32toh(index.flags); | |
return 0; | |
} | |
int relayd_get_metadata( | |
int fd, | |
std::uint64_t stream_id, | |
lttng_viewer_metadata_packet& packet, | |
std::string& packet_data) { | |
lttng_viewer_get_metadata request = {}; | |
request.stream_id = htobe64(stream_id); | |
if (sendcmd(fd, LTTNG_VIEWER_GET_METADATA, request) < 0) { | |
return -1; | |
} | |
if (recvall(fd, &packet, sizeof(packet)) < 0) { | |
return -1; | |
} | |
packet.status = be32toh(packet.status); | |
packet.len = be64toh(packet.len); | |
if (packet.status == LTTNG_VIEWER_METADATA_OK) { | |
if (packet.len == 0) { | |
return -2; | |
} | |
packet_data.resize(packet.len); | |
if (recvall(fd, &packet_data.front(), packet_data.size()) < 0) { | |
return -1; | |
} | |
} | |
return 0; | |
} | |
int relayd_get_packet( | |
int fd, | |
std::uint64_t stream_id, | |
const lttng_viewer_index& index, | |
lttng_viewer_trace_packet& packet, | |
std::string& packet_data) { | |
lttng_viewer_get_packet request = {}; | |
request.stream_id = htobe64(stream_id); | |
request.offset = htobe64(index.offset); | |
// no document mention it's bits but live_test.c | |
request.len = htobe32(index.content_size / CHAR_BIT); | |
if (sendcmd(fd, LTTNG_VIEWER_GET_PACKET, request) < 0) { | |
return -1; | |
} | |
if (recvall(fd, &packet, sizeof(packet)) < 0) { | |
return -1; | |
} | |
packet.status = be32toh(packet.status); | |
packet.len = be32toh(packet.len); | |
packet.flags = be32toh(packet.flags); | |
if (packet.status == LTTNG_VIEWER_GET_PACKET_OK) { | |
if (packet.len == 0) { | |
return -2; | |
} | |
packet_data.resize(packet.len); | |
if (recvall(fd, &packet_data.front(), packet_data.size()) < 0) { | |
return -1; | |
} | |
} | |
return 0; | |
} | |
void dump(const void* ptr, std::size_t size) { | |
auto* buf = reinterpret_cast<const unsigned char*>(ptr); | |
for (std::size_t i=0; i<size; i += 16) { | |
printf("%06x: ", static_cast<unsigned int>(i)); | |
for (std::size_t j=0; j<16; j++) { | |
if (i+j < size) { | |
printf("%02x ", buf[i+j]); | |
} else { | |
printf(" "); | |
} | |
} | |
printf(" "); | |
for (std::size_t j=0; j<16; j++) { | |
if (i+j < size) { | |
printf("%c", isprint(buf[i+j]) ? buf[i+j] : '.'); | |
} | |
} | |
printf("\n"); | |
} | |
} | |
void dump_only_text(const void* ptr, std::size_t size) { | |
auto* buf = reinterpret_cast<const unsigned char*>(ptr); | |
for (std::size_t i=0; i<size; i += 72) { | |
printf("%06x: ", static_cast<unsigned int>(i)); | |
for (std::size_t j=0; j<72; j++) { | |
if (i+j < size) { | |
printf("%c", isprint(buf[i+j]) ? buf[i+j] : '.'); | |
} | |
} | |
printf("\n"); | |
} | |
} | |
} | |
int main() { | |
// start processes, won't replace exists | |
system("lttng-sessiond --daemonize"); | |
system("lttng-relayd --daemonize"); | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
// create new session | |
for (std::size_t x = 0; x < 100; ++x) { | |
lttng_destroy_session(SessionName); | |
std::this_thread::sleep_for(std::chrono::milliseconds(15)); | |
} | |
int ret = lttng_create_session_live(SessionName, SessionUrl, LiveSessionInterval); | |
if (ret != 0) { | |
std::cerr << "lttng_create_session: " << lttng_strerror(ret) << std::endl; | |
return -1; | |
} | |
// create handle from session | |
lttng_domain domain = {}; | |
domain.type = LTTNG_DOMAIN_UST; | |
lttng_handle* handle = lttng_create_handle(SessionName, &domain); | |
if (handle == nullptr) { | |
std::cerr << "lttng_create_handle: " << lttng_strerror(ret) << std::endl; | |
return -1; | |
} | |
// enable event | |
lttng_event event = {}; | |
event.type = LTTNG_EVENT_TRACEPOINT; | |
memcpy(event.name, EventName.c_str(), EventName.size()); | |
event.loglevel_type = LTTNG_EVENT_LOGLEVEL_ALL; | |
event.loglevel = -1; | |
ret = lttng_enable_event_with_exclusions(handle, &event, nullptr, nullptr, 0, nullptr); | |
if (ret < 0) { | |
std::cerr << "lttng_enable_event_with_exclusions: " << lttng_strerror(ret) << std::endl; | |
return -1; | |
} | |
// add context | |
lttng_event_context contextPid = {}; | |
contextPid.ctx = LTTNG_EVENT_CONTEXT_VPID; | |
ret = lttng_add_context(handle, &contextPid, nullptr, nullptr); | |
if (ret < 0) { | |
std::cerr << "lttng_add_context: " << lttng_strerror(ret) << std::endl; | |
return -1; | |
} | |
// start tracing | |
ret = lttng_start_tracing(SessionName); | |
if (ret < 0) { | |
std::cerr << "lttng_start_tracing: " << lttng_strerror(ret) << std::endl; | |
return -1; | |
} | |
// connect to relayd | |
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
if (fd < 0) { | |
perror("socket"); | |
return -1; | |
} | |
sockaddr_in address = {}; | |
address.sin_addr.s_addr = inet_addr("127.0.0.1"); | |
address.sin_family = AF_INET; | |
address.sin_port = htons(5344); | |
ret = connect(fd, (sockaddr*)&address, sizeof(address)); | |
if (ret < 0) { | |
perror("connect"); | |
return -1; | |
} | |
// init connection | |
std::uint64_t session_id = 0; | |
if ((ret = relayd_connect(fd, session_id)) < 0) { | |
std::cerr << "relayd_connect failed: " << ret << std::endl; | |
return -1; | |
} | |
std::cout << "relayd_connect success, session_id: " << session_id << std::endl; | |
// create viewer session | |
if ((ret = relayd_create_viewer_session(fd)) < 0) { | |
std::cerr << "relayd_create_viewer_session failed: " << ret << std::endl; | |
return -1; | |
} | |
std::cout << "relayd_create_viewer_session success" << std::endl; | |
// list sessions and find out which to attach | |
std::vector<lttng_viewer_session> sessions; | |
if ((ret = relayd_list_sessions(fd, sessions)) < 0) { | |
std::cerr << "relayd_list_sessions failed: " << ret << std::endl; | |
return -1; | |
} | |
std::cout << "relayd_list_sessions success, session count: " << sessions.size() << std::endl; | |
std::uint64_t attach_to_session_id = 0; | |
for (const auto& session : sessions) { | |
std::cout << "session: id = " << session.id << | |
", live_timer = " << session.live_timer << | |
", hostname = " << session.hostname << | |
", session_name = " << session.session_name << std::endl; | |
// only live session can attach | |
if (strcmp(SessionName, session.session_name) == 0 && session.live_timer > 0) { | |
attach_to_session_id = session.id; | |
} | |
} | |
if (attach_to_session_id == 0) { | |
std::cerr << "no session to attach" << std::endl; | |
return -1; | |
} | |
// attach to session | |
std::unordered_map<std::uint64_t, lttng_viewer_stream> streams; | |
if ((ret = relayd_attach_session(fd, attach_to_session_id, streams)) < 0) { | |
std::cerr << "relayd_attach_session failed: " << ret << std::endl; | |
return -1; | |
} | |
std::cout << "relayd_attach_session success" << std::endl; | |
// event receive loop | |
signal(SIGINT, sigint_handler); | |
lttng_viewer_index index = {}; | |
lttng_viewer_metadata_packet metadata = {}; | |
lttng_viewer_trace_packet packet = {}; | |
std::string packet_data; | |
std::vector<std::uint64_t> remove_stream_ids; | |
std::function<void(lttng_viewer_stream&)> callback = [](lttng_viewer_stream& stream) { | |
std::cout << "new stream: id = " << stream.id << | |
", ctf_trace_id = " << stream.ctf_trace_id << | |
", metadata_flag = " << stream.metadata_flag << | |
", path_name = " << stream.path_name << | |
", channel_name = " << stream.channel_name << std::endl; | |
}; | |
std::cout << "enter event receive loop" << std::endl; | |
bool require_new_metadata = false; | |
bool require_new_streams = false; | |
while (LoopFlag) { | |
// get new streams | |
if (require_new_streams || streams.empty()) { | |
if ((ret = relayd_get_new_streams(fd, | |
attach_to_session_id, streams, callback)) < 0) { | |
std::cerr << "relayd_get_new_streams failed: " << ret << std::endl; | |
return -1; | |
} | |
require_new_streams = false; | |
} | |
// get metadata | |
if (require_new_metadata) { | |
for (const auto& pair : streams) { | |
const auto& stream = pair.second; | |
if (stream.metadata_flag == 0) { | |
continue; | |
} | |
while (LoopFlag) { | |
if ((ret = relayd_get_metadata(fd, stream.id, metadata, packet_data)) < 0) { | |
std::cerr << "relayd_get_metadata failed: " << ret << std::endl; | |
return -1; | |
} | |
if (metadata.status == LTTNG_VIEWER_NO_NEW_METADATA) { | |
break; | |
} | |
if (metadata.status != LTTNG_VIEWER_METADATA_OK) { | |
remove_stream_ids.emplace_back(stream.id); | |
break; | |
} | |
// shrink packet data (only for dump) | |
auto index_not_zero = packet_data.find_last_not_of('\x00'); | |
if (index_not_zero != std::string::npos) { | |
packet_data.resize(index_not_zero + 1); | |
} | |
// print metadata | |
std::cout << "relayd_get_metadata success: len = " << packet_data.size() << std::endl; | |
dump_only_text(packet_data.c_str(), packet_data.size()); | |
} | |
} | |
require_new_metadata = false; | |
} | |
// get events | |
for (const auto& pair : streams) { | |
const auto& stream = pair.second; | |
if (stream.metadata_flag != 0) { | |
continue; | |
} | |
while (LoopFlag) { | |
// get packet index | |
if ((ret = relayd_get_next_index(fd, stream.id, index)) < 0) { | |
std::cerr << "relayd_get_next_index failed: " << ret << std::endl; | |
return -1; | |
} | |
if (index.status == LTTNG_VIEWER_INDEX_RETRY) { | |
break; | |
} | |
if (index.status != LTTNG_VIEWER_INDEX_OK) { | |
remove_stream_ids.emplace_back(stream.id); | |
break; | |
} | |
// check flags | |
require_new_metadata = require_new_metadata || (index.flags & LTTNG_VIEWER_FLAG_NEW_METADATA); | |
require_new_streams = require_new_streams || (index.flags & LTTNG_VIEWER_FLAG_NEW_STREAM); | |
if (require_new_metadata || require_new_streams) { | |
break; | |
} | |
// get packet data | |
if ((ret = relayd_get_packet(fd, stream.id, index, packet, packet_data)) < 0) { | |
std::cerr << "relayd_get_packet failed: " << ret << std::endl; | |
return -1; | |
} | |
if (packet.status == LTTNG_VIEWER_GET_PACKET_OK) { | |
std::cout << "relayd_get_packet success: len = " << packet_data.size() << std::endl; | |
} else { | |
std::cerr << "relayd_get_packet failed: status = " << packet.status << std::endl; | |
return -1; | |
} | |
// the format is ctf (common trace format) | |
// I was trying to use babeltrace as a library to parse it | |
// but babeltrace have none docuemnt or examples, sucks as lttng | |
dump(packet_data.c_str(), packet_data.size()); | |
} | |
} | |
// remove destroyed or inactive stream | |
for (std::uint64_t stream_id : remove_stream_ids) { | |
streams.erase(stream_id); | |
} | |
remove_stream_ids.clear(); | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
} | |
std::cout << "shutdown" << std::endl; | |
shutdown(fd, SHUT_RDWR); | |
close(fd); | |
lttng_stop_tracing(SessionName); | |
lttng_destroy_handle(handle); | |
lttng_destroy_session(SessionName); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment