Skip to content

Instantly share code, notes, and snippets.

@elmazzun
Last active January 25, 2017 12:38
Show Gist options
  • Save elmazzun/108757a4eabde90c772bdefaa234d0a6 to your computer and use it in GitHub Desktop.
Save elmazzun/108757a4eabde90c772bdefaa234d0a6 to your computer and use it in GitHub Desktop.
/***************** MAIN() *****************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <iostream>
#include <vector>
#include <list>
#include <thread>
#include <sys/shm.h> /* shmat(), IPC_RMID */
#include <semaphore.h> /* sem_open(), sem_destroy(), sem_wait().. */
#include <fcntl.h> /* O_CREAT, O_EXEC */
#include "ClientManager.h"
// #include "StatManager.h"
#include "InterfacesManager.h"
#include "DequeManager.h"
#include "QOSManager.h"
#define INTERFACE_UPDATE_INTERVAL_SEC 3
using namespace std;
void handle_sigchld(int sig) {
pid_t pid_quit_child = 0;
while ((pid_quit_child = waitpid((pid_t)(-1), 0, WNOHANG)) > 0) {
//debug_red("WAITPID, proc " << pid_quit_child << " done");
}
}
void handle_sigInt(int s){
printf("Caught signal %d\n",s);
exit(1);
}
void usage(char* progName)
{
cout << progName << " [options]" << endl <<
"Options:" << endl <<
"-h Print this help" << endl <<
"-i Interfaces to exclude (separated by ,)" << endl <<
"-l Log file path-name" << endl <<
"-d Algorithm for empting queues (removeAll or removeSome)" << endl <<
"-s Algorithm for avoiding starvation (aging or statistical)" << endl <<
"[-n] No deques and starvation management (normal proxy)" << endl <<
"[-b] Byte to update interfaces statistics (default 250000 byte)" << endl <<
"[-t] Timeout to update unused interfaces in seconds (default 10s)" << endl <<
"[-u] Execute updating on unused interfaces ['yes(default)' or 'no']" << endl <<
"-p The port number to listen" << endl;
}
int main(int argc,char* argv[]) {
char *log_file = NULL;
list<string> interface2exclude;
int listening_port = 0;
int char_opt;
int timeUpdate = TIME_STAT_UPDATE;
int byteUpdate = BLOCK_SIZE_STATS_BYTE;
bool statupdate = true;
const char *removerAlgorithm = NULL;
const char *controllerAlgorithm = NULL;
bool normalProxy = false;
ClientManager cm;
int numConnection = 0;
/****************************************************************/
/* CHECK PARAMETERS */
/****************************************************************/
opterr = 0;
while ((char_opt = getopt (argc, argv, "nhp:i:l:b:u:t:d:s:")) != -1) {
char tmpstr[64];
char *pch;
switch (char_opt) {
case 'd':
if (strcmp(optarg, "removeAll") && strcmp(optarg, "removeSome")) {
usage(argv[0]);
return EXIT_FAILURE;
}
removerAlgorithm = optarg;
cout << "algoritmo di rimozione: " << removerAlgorithm << endl;
break;
case 's':
if (strcmp(optarg, "aging") && strcmp(optarg, "statistical")) {
usage(argv[0]);
return EXIT_FAILURE;
}
controllerAlgorithm = optarg;
cout << "algoritmo contro starvation: " << controllerAlgorithm << endl;
break;
case 'h':
usage(argv[0]);
return EXIT_SUCCESS;
case 'n':
normalProxy = true;
cm.setProxyMode(true);
cout << "This is just a proxy" << endl;
break;
case 'p':
listening_port = atoi(optarg);
break;
case 'b':
byteUpdate = atoi(optarg);
break;
case 't':
timeUpdate = atoi(optarg);
break;
case 'u':
if (!strncmp(optarg, "yes", 3)) {
statupdate = true;
}
else if (!strncmp(optarg, "no", 2)) {
statupdate = false;
}
else {
usage(argv[0]);
return EXIT_FAILURE;
}
break;
case 'i':
strncpy(tmpstr, optarg, sizeof(tmpstr));
pch = strtok (tmpstr, ",");
while (pch != NULL) {
interface2exclude.push_back(pch);
pch = strtok (NULL, ",");
}
break;
case 'l':
log_file = optarg;
break;
case '?':
if ((optopt == 'p') || (optopt == 'i') || (optopt == 'l') ||
(optopt == 'u') || (optopt == 't') || (optopt == 'b')) {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if (isprint (optopt)) {
fprintf (stderr, "Unknown option `-%c'.\n", optopt);
}
else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
usage(argv[0]);
return EXIT_FAILURE;
default:
abort ();
}
}
if (optind < argc) {
for (int idx = optind; idx < argc; idx++) {
fprintf (stderr, "Non-option argument %s\n", argv[idx]);
}
usage(argv[0]);
return EXIT_FAILURE;
}
if (log_file == NULL) {
fprintf(stderr, "Insert log file name\n");
usage(argv[0]);
return EXIT_FAILURE;
}
if (listening_port == 0) {
fprintf(stderr, "Insert listening port\n");
usage(argv[0]);
return EXIT_FAILURE;
}
/*printf ("port = %hu, log = %s\n", listening_port, log_file);
for (std::list<std::string>::iterator it_str = interface2exclude.begin(); it_str != interface2exclude.end(); it_str++ ) {
printf ("interface %s\n", it_str->c_str());
}*/
/****************************************************************/
/* STARTING PROXY */
/****************************************************************/
time_t lastInterfaceUpdate, now;
cout << "Start PROXY!!!, id " << this_thread::get_id() << endl;
// register to signal to prevent zombie child
struct sigaction sa;
sa.sa_handler = &handle_sigchld;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, 0) == -1) {
perror("sigaction error");
exit(EXIT_FAILURE);
}
// register to signal catch the kill signal (not yet used)
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = &handle_sigInt;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
if (sigaction(SIGINT, &sigIntHandler, NULL) == -1) {
perror("sigaction error");
exit(EXIT_FAILURE);
}
// init the statModule
std::string logFile = log_file;
// if just a proxy, no algorithms parameters in log file name
if (normalProxy) {
logFile.append("_noAlgos");
}
// is deques and starvation, insert algorithms for removing deques and dealing with starvation
else {
logFile.append("_");
logFile.append(removerAlgorithm);
logFile.append("_");
logFile.append(controllerAlgorithm);
}
// StatManager::getInstance().setFileName(logFile.c_str());
// init the interfaces
InterfacesManager::getInstance().setUpdateFlag(statupdate);
InterfacesManager::getInstance().setTimerUpdate(timeUpdate);
InterfacesManager::getInstance().checkInterfaces(interface2exclude);
//InterfacesManager::getInstance().printInterfaces();
time(&lastInterfaceUpdate);
// init the ClientManager
cm.setByteStat(byteUpdate);
// Parse xml file and get hostnames
QOSManager::getInstance().getAndResolveHostnamesFromXML();
if (!normalProxy) {
DequeManager::getInstance().startRemoverThread(removerAlgorithm);
DequeManager::getInstance().startControllerThread(controllerAlgorithm);
}
debug_default(std::this_thread::get_id() << " THREAD MAIN");
//listening on port
if (cm.startListeningForClient(listening_port)) {
// forever: accept new clients
debug_default("ProxyDASH in ascolto sulla porta " << listening_port);
while(true) {
int new_client_socket = cm.acceptConnectionFromClient();
//debug_yellow("ProxyDASH accepted on " << new_client_socket);
//debug_default("Connection accepted on fd " << new_client_socket);
// if (numConnection == 10) {
// break;
// }
// must re-check interfaces?
time(&now);
if (difftime(now, lastInterfaceUpdate) > INTERFACE_UPDATE_INTERVAL_SEC) {
InterfacesManager::getInstance().checkInterfaces(interface2exclude);
time(&lastInterfaceUpdate);
}
if (new_client_socket >= 0) {
++numConnection;
//cm.forkAndManageClient(); // only the parent process exit from this call
//thread t = thread(&ClientManager::manageClient, &cm);
//t.join();
cm.manageClient();
}
else {
perror("accept error");
}
}
}
else {
perror("Error on start listening");
}
QOSManager::getInstance().showConnectionsMap();
// free interface mmap memory
InterfacesManager::getInstance().freeMemory();
// StatManager::getInstance().freeMemory();
cout << "End PROXY!!!" << endl;
return EXIT_SUCCESS;
}
/***************** ClientManager.h *****************/
#ifndef CLIENTMANAGER_H_
#define CLIENTMANAGER_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <iostream>
#include <vector>
#include <list>
#include <fstream>
// RAW???
// #include <netpacket/packet.h>
// #include <net/ethernet.h>
#include "RequestManager.h"
#include "Miscellaneous.h"
#include "QOSManager.h"
#include "DequeManager.h"
#include "InterfacesManager.h"
// #include "StatManager.h"
#include "HTTPManager.h"
#define BIG_BUFF_LEN 16384
class ClientManager {
public:
ClientManager();
virtual ~ClientManager();
void setProxyMode(bool isjustProxy);
bool startListeningForClient(int port);
int acceptConnectionFromClient(void);
int manageClient(void);
//int forkAndManageClient(void);
void setByteStat(int byteS);
void setDiscardFlag(bool discard);
void setAlgoOptions(char algo,int offset,char* quality);
private:
bool getRequestFromClient(void);
// bool manageRequest(void);
// void forkAndUpdateStats(struct sockaddr_in *addr_in);
// bool sendGETtoDest(struct sockaddr_in *if_to_bind);
// void manageTransferFromDestToClient(struct sockaddr_in *if_used);
// void manageTransferOnStatUpdate(struct sockaddr_in *if_used);
void initVideoRequest();
private:
int sockfd_VideoClient;
int new_sockfd_VideoClient;
// int sockfd_VideoServer;
bool isJustAProxy;
int byte_update;
bool discard_MPEGDASH;
char algo;
char* quality;
int offset;
int requestSize;
char buffer[16384];
struct sockaddr_in if_main; //main interface
//std::list<struct sockaddr_in> it_used;
std::list<struct sockaddr_in> if_to_use;
RequestManager rm;
// HTTPManager hm;
};
#endif /* CLIENTMANAGER_H_ */
/***************** ClientManager.cpp *****************/
#include "ClientManager.h"
/*static long long timevaldiff_usec(struct timeval *start, struct timeval *end) {
return (end->tv_sec * 1000000 + end->tv_usec) - (start->tv_sec * 1000000 + start->tv_usec);
}*/
inline std::ostream& operator<<(std::ostream &o, const infoPkt& p) {
o << "C " << p.clientFd << ", S " << p.serverFd << ", " <<
(p.request == GET_REQ ? "GET" : "CONNECT") << ", " <<
(p.direction == DIRECTION_UPLOAD ? "UPLOAD" : "DOWNLOAD") << ", " <<
"prio " << p.priority << ", size " << p.payloadSize << ", host " << p.host <<
", first " << p.first << ", last " << p.last;
return o;
}
ClientManager::ClientManager() {
debug_default("Created ClientManager");
new_sockfd_VideoClient = -1;
sockfd_VideoClient = -1;
requestSize = 0;
byte_update = BLOCK_SIZE_STATS_BYTE;
isJustAProxy = false;
}
ClientManager::~ClientManager() {
debug_yellow("DESTROYED ClientManager");
}
void ClientManager::setProxyMode(bool isjustProxy) {
isJustAProxy = isjustProxy;
}
void ClientManager::setByteStat(int byteS) {
byte_update = byteS;
}
bool ClientManager::startListeningForClient(int port) {
struct sockaddr_in serv_addr;
bzero((char*)&serv_addr,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_port=htons(port);
serv_addr.sin_addr.s_addr=INADDR_ANY;
sockfd_VideoClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockfd_VideoClient < 0) {
perror("Problem in initializing socket");
return false;
}
if (bind(sockfd_VideoClient,(struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Error on binding");
return false;
}
if (listen(sockfd_VideoClient, 50) < 0) {
perror("Error on listen");
return false;
}
return true;
}
int ClientManager::acceptConnectionFromClient(void) {
struct sockaddr_in cli_addr;
unsigned int clilen;
bzero((char*)&cli_addr, sizeof(cli_addr));
clilen = sizeof(cli_addr);
new_sockfd_VideoClient = accept(sockfd_VideoClient, (struct sockaddr*)&cli_addr, &clilen);
if (new_sockfd_VideoClient < 0) {
perror("ERROR on accept");
}
return new_sockfd_VideoClient;
}
static bool is_valid_fd(int fd) {
return fcntl(fd, F_GETFL) != -1 || errno != EBADF;
}
int ClientManager::manageClient(void) {
if (getRequestFromClient()) {
std::string host_string = std::string(rm.getHostName());
infoPkt newElem;
memset(&newElem, 0, sizeof(newElem));
newElem.clientFd = new_sockfd_VideoClient;
newElem.serverFd = -1;
newElem.request = (rm.isGET() ? GET_REQ : CONNECT_REQ);
newElem.direction = DIRECTION_UPLOAD;
newElem.priority = QOSManager::getInstance().whatPriorityHasConn(host_string);
strncpy(newElem.payload, rm.getCopyOfRequest(), requestSize);
newElem.payloadSize = requestSize;
newElem.serveraddr = rm.getServerAddr();
strcpy(newElem.host, rm.getHostName());
newElem.start = std::chrono::system_clock::now();
newElem.first = true;
newElem.last = false;
if (isJustAProxy) {
InterfacesManager::getInstance().chooseIFMain(if_main, if_to_use);
int pid = fork();
if (pid < 0) {
}
else if (pid > 0) {
}
else {
close(sockfd_VideoClient);
InterfacesManager::getInstance().setUsed(if_main.sin_addr.s_addr);
HTTPManager::dealConn(newElem, if_main);
InterfacesManager::getInstance().setFree(if_main.sin_addr.s_addr);
_exit(EXIT_SUCCESS);
}
}
else {
DequeManager::getInstance().insert(newElem);
debug_green(std::this_thread::get_id() << " ClientManager::manageClient() inserted " << newElem << " --->\n" << newElem.payload);
}
}
else {
debug_red("ClientManager::manageClient()...getRequestFromClient() FAILED");
close(new_sockfd_VideoClient);
new_sockfd_VideoClient = -1;
}
return 1;
}
bool ClientManager::getRequestFromClient(void) {
if (new_sockfd_VideoClient < 0) return false;
bzero(buffer, sizeof(buffer));
int nrcv = recv(new_sockfd_VideoClient, buffer, sizeof(buffer), 0);
if (nrcv < 0) {
perror("Error recv from client socket");
return false;
}
else if (nrcv == 0) {
debug_high("ClientManager::getRequestFromClient - Connection closed by client\n");
return false;
}
else {
if (!rm.load_req(buffer, nrcv)) {
return false;
}
requestSize = nrcv;
debug_default(std::this_thread::get_id() << " ClientManager::getRequestFromClient ricevuti da client " << requestSize << " byte");
}
return true;
}
/***************** HTTPManager.h *****************/
#ifndef HTTPMANAGER_H_
#define HTTPMANAGER_H_
#include <iostream>
#include <string.h>
#include <string>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <net/if.h>
#include <ifaddrs.h>
#include <linux/if_ether.h>
#include <net/ethernet.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <pcap.h>
#include <fstream>
#include "Miscellaneous.h"
// #include "StatManager.h"
#include "RequestManager.h"
#include "InterfacesManager.h"
#include "DequeManager.h"
#include "QOSManager.h"
class HTTPManager {
public:
static int openBindConnect(in_addr_t addr, int req, struct sockaddr_in *interface);
static bool send200OKtoClient(int clientfd);
static bool tunnel(int clientfd, int serverfd, struct sockaddr_in *if_main, std::string host);
static bool sendGET(int serverfd, std::string getReq);
static bool recvGET(int clientfd, int serverfd, struct sockaddr_in *if_used);
static void dealPkt(infoPkt infoPkt, struct sockaddr_in if_main);
static bool dealConn(infoPkt info_conn, struct sockaddr_in if_main);
static void pcapSession(char addr1[], unsigned short port1, char addr2[], unsigned short port2,
const char *interface, std::string remoteHostname);
};
#endif /* HTTPMANAGER_H_ */
/***************** HTTPManager.cpp *****************/
#include "HTTPManager.h"
static long long timevaldiff_usec(struct timeval *start, struct timeval *end) {
return (end->tv_sec * 1000000 + end->tv_usec) - (start->tv_sec * 1000000 + start->tv_usec);
}
static bool is_valid_fd(int fd) {
return fcntl(fd, F_GETFL) != -1 || errno != EBADF;
}
inline std::ostream& operator<<(std::ostream &o, const infoPkt& p) {
o << "C " << p.clientFd << ", S " << p.serverFd << ", " <<
(p.request == GET_REQ ? "GET" : "CONNECT") << ", " <<
(p.direction == DIRECTION_UPLOAD ? "UPLOAD" : "DOWNLOAD") << ", " <<
"prio " << p.priority << ", size " << p.payloadSize << ", host " << p.host <<
", first " << p.first << ", last " << p.last;
return o;
}
void HTTPManager::dealPkt(infoPkt p, struct sockaddr_in if_main) {
// InterfacesManager::getInstance().setUsed(if_main.sin_addr.s_addr);
debug_default(std::this_thread::get_id() << " HTTPManager::dealPkt() thread " << std::this_thread::get_id() << " started");
int client = p.clientFd;
int server = p.serverFd;
int request = p.request;
int priority = p.priority;
std::string host = p.host;
// debug_green("HTTPManager::dealPkt() host " << host << " has priority " << priority);
// ...pkt di richiesta connessione: aprire connessione
// server = openBindConnect(p.serveraddr, request, &if_main);
// if (server < 0) {
// debug_red("HTTPManager::dealPkt() Error opening remote connection with " << host);
// return;
// }
debug_default(std::this_thread::get_id() << " HTTPManager::dealPkt() dealing with socket fd pair (" << client << " " << server << ")");
if (!is_valid_fd(p.clientFd)) { debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt() clientFd not valid"); return; }
if (!is_valid_fd(server)) { debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt() serverFd not valid"); return; }
// se e' una richiesta GET...
if (request == GET_REQ) {
// ...mandare la richiesta al server
debug_yellow(std::this_thread::get_id() << " HTTPManager::dealPkt() invia " << p);
if (send(server, p.payload, p.payloadSize, 0) > 0) {
// e accodare tutti i pacchetti che il server invia al proxy
char buffer[MAX_PAYLOAD_SIZE];
int n_recv = 0;
bool lastPacket;
do {
memset(buffer, 0, sizeof(buffer));
n_recv = recv(server, buffer, sizeof(buffer), 0);
if (n_recv > 0) {
lastPacket = false;
}
else if (n_recv == 0) {
lastPacket = true;
}
else {
perror("HTTPManager::dealPkt() [GET] Error receiving from remote server");
lastPacket = true;
}
infoPkt newElem;
memset(&newElem, 0, sizeof(newElem));
newElem.clientFd = client;
newElem.serverFd = server;
newElem.request = GET_REQ;
newElem.direction = DIRECTION_DOWNLOAD;
newElem.priority = priority;
strncpy(newElem.payload, buffer, n_recv);
newElem.payloadSize = n_recv;
newElem.serveraddr = p.serveraddr;
strcpy(newElem.host, p.host);
newElem.start = std::chrono::system_clock::now();
newElem.first = false;
newElem.last = lastPacket;
// debug_red("HTTPManager::dealPkt() " << strlen(newElem.payload) << " =? " << newElem.payloadSize);
debug_green(std::this_thread::get_id() << " HTTPManager::dealPkt() [GET] accodo pkt " << newElem);
if (!DequeManager::getInstance().insert(newElem)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt() [GET] thread" << std::this_thread::get_id() << " failed to insert pkt");
return;
}
else {
}
} while (n_recv > 0);
debug_default(std::this_thread::get_id() << " HTTPManager::dealPkt() [GET] done receiving from server");
}else {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt() [GET] failed to send REQUEST to server\n");
}
}
// se e' una richiesta CONNECT...
else if (request == CONNECT_REQ) {
//...mandare 200 OK al client
if (!send200OKtoClient(client)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt send 200 ok to client failed");
return;
}
debug_default(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] sent 200 OK to client");
int r;
int recvFromClient = 0, recvFromServer = 0;
// int sendToClient = 0, sentToServer = 0;
int maxFd = client >= server ? client+1 : server+1;
debug_yellow(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] max fd+1 between client " << client << " and server " <<
server << ": " << maxFd);
fd_set readSet;
struct timeval timeout;
char buffer[MAX_PAYLOAD_SIZE];
int bufferSize = sizeof(buffer);
bool lastPacket;
// tunnel
while(true) {
memset(buffer, 0, sizeof(buffer));
FD_ZERO(&readSet);
FD_SET(client, &readSet);
FD_SET(server, &readSet);
timeout.tv_sec = 5;
timeout.tv_usec = 0;
r = select(maxFd, &readSet, NULL, NULL, &timeout);
if (r < 0) {
perror("HTTPManager::dealPkt [CONNECT] error select");
infoPkt errorPacket;
memset(&errorPacket, 0, sizeof(errorPacket));
errorPacket.clientFd = client;
errorPacket.serverFd = server;
errorPacket.request = request;
errorPacket.direction = DIRECTION_UPLOAD;
errorPacket.priority = priority;
errorPacket.payloadSize = 0;
errorPacket.serveraddr = p.serveraddr;
strcpy(errorPacket.host, p.host);
errorPacket.start = std::chrono::system_clock::now();
errorPacket.first = false;
errorPacket.last = true;
if (!DequeManager::getInstance().insert(errorPacket)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] thread " << std::this_thread::get_id() <<
" FAILED to insert ");
}
break;
}
else if (r == 0) {
debug_yellow(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] timeout expired, nothing to read");
continue;
}
if (FD_ISSET(client, &readSet)) {
recvFromClient = recv(client, buffer, bufferSize, 0);
if (recvFromClient <= 0) {
if (recvFromClient < 0) {
perror("HTTPManager::dealPkt [CONNECT] recv() from client");
}
debug_green(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] recv() 0 from client, crafting last packet " << client << " " << server);
infoPkt lastPacket;
memset(&lastPacket, 0, sizeof(lastPacket));
lastPacket.clientFd = client;
lastPacket.serverFd = server;
lastPacket.request = request;
lastPacket.direction = DIRECTION_UPLOAD;
lastPacket.priority = priority;
lastPacket.payloadSize = 0;
lastPacket.serveraddr = p.serveraddr;
strcpy(lastPacket.host, p.host);
lastPacket.start = std::chrono::system_clock::now();
lastPacket.first = false;
lastPacket.last = true;
if (!DequeManager::getInstance().insert(lastPacket)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] FAILED TO INSERT END CONNECTION PKT");
}
break;
}
infoPkt newElem;
memset(&newElem, 0, sizeof(newElem));
newElem.clientFd = client;
newElem.serverFd = server;
newElem.request = request;
newElem.direction = DIRECTION_UPLOAD;
newElem.priority = priority;
strncpy(newElem.payload, buffer, recvFromClient);
newElem.payloadSize = recvFromClient;
newElem.serveraddr = p.serveraddr;
strcpy(newElem.host, p.host);
newElem.start = std::chrono::system_clock::now();
newElem.first = false;
newElem.last = false;
if (!DequeManager::getInstance().insert(newElem)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] FAILED TO INSERT END CONNECTION PKT");
}
else {
debug_default(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] queued up pkt from client, " << recvFromClient << " bytes");
}
}
if (FD_ISSET(server, &readSet)) {
recvFromServer = recv(server, buffer, bufferSize, 0);
if (recvFromServer <= 0) {
if (recvFromServer < 0) {
perror("HTTPManager::dealPkt [CONNECT] recv() from server");
}
debug_green(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] recv() 0 from client, crafting last packet " << client << " " << server);
infoPkt lastPacket;
memset(&lastPacket, 0, sizeof(lastPacket));
lastPacket.clientFd = client;
lastPacket.serverFd = server;
lastPacket.request = request;
lastPacket.direction = DIRECTION_DOWNLOAD;
lastPacket.priority = priority;
lastPacket.payloadSize = 0;
lastPacket.serveraddr = p.serveraddr;
strcpy(lastPacket.host, p.host);
lastPacket.start = std::chrono::system_clock::now();
lastPacket.first = false;
lastPacket.last = true;
if (!DequeManager::getInstance().insert(lastPacket)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] FAILED TO INSERT END CONNECTION PKT");
}
break;
}
infoPkt newElem;
memset(&newElem, 0, sizeof(newElem));
newElem.clientFd = client;
newElem.serverFd = server;
newElem.request = request;
newElem.direction = DIRECTION_DOWNLOAD;
newElem.priority = priority;
strncpy(newElem.payload, buffer, recvFromServer);
newElem.payloadSize = recvFromServer;
newElem.serveraddr = p.serveraddr;
strcpy(newElem.host, p.host);
newElem.start = std::chrono::system_clock::now();
newElem.first = false;
newElem.last = false;
if (!DequeManager::getInstance().insert(newElem)) {
debug_red(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] FAILED TO INSERT END CONNECTION PKT");
}
debug_default(std::this_thread::get_id() << " HTTPManager::dealPkt [CONNECT] queued up pkt from server, " << recvFromServer <<
" bytes");
}
}
}
// InterfacesManager::getInstance().setFree(if_main.sin_addr.s_addr);
debug_green(std::this_thread::get_id() << " HTTPManager::dealPkt dealPkt() thread terminated");
}
// restituisce nome interfaccia "wlan0", "wlan1", ... dato un indirizzo IP
std::string getInterfaceNameFromIP(sockaddr_in interface) {
struct ifaddrs *addrs, *iap;
struct sockaddr_in *sa;
char buf[32];
std::string ret;
char bufInterface[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(interface.sin_addr), bufInterface, INET_ADDRSTRLEN);
// debug_red("IP DA CONFRONTARE " << interface.sin_addr.s_addr <<
// " CHE SAREBBE " << bufInterface);
getifaddrs(&addrs);
for (iap = addrs; iap != NULL; iap = iap->ifa_next) {
if (iap->ifa_addr && (iap->ifa_flags & IFF_UP) && iap->ifa_addr->sa_family == AF_INET) {
sa = (struct sockaddr_in *)(iap->ifa_addr);
inet_ntop(iap->ifa_addr->sa_family, (void *)&(sa->sin_addr), buf, sizeof(buf));
// debug_red("TROVATO " << buf);
if (strcmp(bufInterface, buf) == 0) {
// debug_green("getInterfaceNameFromIP " << bufInterface << " E' UGUALE A " << buf << ", RESTITUIRE " << iap->ifa_name);
ret = std::string(iap->ifa_name);
break;
}
}
}
freeifaddrs(addrs);
return ret;
}
/********************************** IP header **********************************/
struct iphdr *getIPheader(const u_char *packet, unsigned short *iphdrlen) {
struct iphdr *iph = (struct iphdr *)(packet + sizeof(struct ethhdr));
*iphdrlen = iph->ihl*4;
return iph;
}
/********************************** TCP header **********************************/
struct tcphdr *getTCPheader(const u_char *packet, unsigned short iphdrlen) {
return (struct tcphdr *)(packet + sizeof(struct ethhdr) + iphdrlen);
}
bool isSYNset(struct tcphdr *tcph) {
return (tcph->syn == 1 && tcph->ack == 0 && tcph->fin == 0);
}
bool isACKset(struct tcphdr *tcph) {
return (tcph->syn == 0 && tcph->ack == 1 && tcph->fin == 0);
}
bool isFINset(struct tcphdr *tcph) {
return (tcph->syn == 0 && tcph->ack == 0 && tcph->fin == 1);
}
bool areSYNandACKset(struct tcphdr *tcph) {
return (tcph->syn == 1 && tcph->ack == 1 && tcph->fin == 0);
}
bool areFINandACKset(struct tcphdr *tcph) {
return (tcph->syn == 0 && tcph->ack == 1 && tcph->fin == 1);
}
unsigned short getSrcPort(struct tcphdr *tcph) {
return ((unsigned short)ntohs(tcph->source));
}
unsigned short getDstPort(struct tcphdr *tcph) {
return ((unsigned short)ntohs(tcph->dest));
}
// CONVENZIONE: addr1 e' dell'interfaccia, addr2 e' del server remoto
void HTTPManager::pcapSession(char addr1[], unsigned short port1, char addr2[], unsigned short port2, const char *interface,
std::string remoteHostname) {
int res;
// char *dev = NULL;
char errbuf[PCAP_ERRBUF_SIZE];
pcap_t *handle;
struct bpf_program fp;
bpf_u_int32 mask;
bpf_u_int32 net;
struct pcap_pkthdr *header;
const u_char *pkt_data;
const struct ethhdr *ethdr = NULL;
const struct iphdr *iph = NULL;
const struct tcphdr *tcph = NULL;
unsigned short iphdrlen = 0;
unsigned short tcphdrlen = 0;
unsigned short sourcePort = 0, destPort = 0;
struct sockaddr_in sourceAddr, destAddr;
char IPsrc[INET_ADDRSTRLEN], IPdst[INET_ADDRSTRLEN];
// bool firstRoundUpload = true;
// bool firstRoundDownload = true;
struct timeval oldTimevalUpload;
struct timeval oldTimevalDownload;
memset(&oldTimevalUpload, 0, sizeof(oldTimevalUpload));
memset(&oldTimevalDownload, 0, sizeof(oldTimevalDownload));
// unsigned int pktsUpload = 0, pktsDownload = 0;
// filter: ip and tcp and ((src addr1 && dst addr2) || (src addr2 && dst addr1))
std::string filterString = "tcp and ((src ";
filterString.append(addr1);
filterString.append(" && dst ");
filterString.append(addr2);
filterString.append(") || (src ");
filterString.append(addr2);
filterString.append(" && dst ");
filterString.append(addr1);
filterString.append("))");
// QOSManager::getInstance().whatPriorityHasConn(host_string);
const char *filter_exp = filterString.c_str();
// debug_green(std::this_thread::get_id() << ' ' << filter_exp);
/* Find the properties for the device */
if (pcap_lookupnet(interface, &net, &mask, errbuf) == -1) {
fprintf(stderr, "Couldn't get netmask for device %s: %s\n", interface, errbuf);
net = 0;
mask = 0;
}
handle = pcap_open_live(interface, 65535, 0, 1000, errbuf);
if (handle == NULL) {
fprintf(stderr, "Couldn't open device %s: %s\n", interface, errbuf);
return;
}
/* Compile and apply the filter */
if (pcap_compile(handle, &fp, filter_exp, 0, net) == -1) {
fprintf(stderr, "Couldn't parse filter %s: %s\n", filter_exp, pcap_geterr(handle));
return;
}
if (pcap_setfilter(handle, &fp) == -1) {
fprintf(stderr, "Couldn't install filter %s: %s\n", filter_exp, pcap_geterr(handle));
return;
}
while (((res = pcap_next_ex(handle, &header, &pkt_data)) >= 0)/* && (!done)*/) {
if (res == 0) { // timeout elapsed
continue;
}
ethdr = (struct ethhdr *)pkt_data;
iph = getIPheader(pkt_data, &iphdrlen);
if (!iph) {
debug_red("Failed to retrieve IP header");
return;
}
if (iphdrlen < 20) {
debug_red("Invalid IP header length: " << iphdrlen);
return;
}
memset(&(sourceAddr), 0, sizeof(sourceAddr));
sourceAddr.sin_addr.s_addr = iph->saddr;
memset(&(destAddr), 0, sizeof(destAddr));
destAddr.sin_addr.s_addr = iph->daddr;
if (inet_ntop(AF_INET, &(sourceAddr.sin_addr), IPsrc, INET_ADDRSTRLEN) == NULL) {
debug_red("Failed to retrieve source address");
return;
}
if (inet_ntop(AF_INET, &(destAddr.sin_addr), IPdst, INET_ADDRSTRLEN) == NULL) {
debug_red("Failed to retrieve destination address");
return;
}
tcph = getTCPheader(pkt_data, iphdrlen);
if (!tcph) {
debug_red("Failed to retrieve TCP header");
return;
}
tcphdrlen = tcph->doff*4;
if (tcphdrlen < 20) {
debug_red("Invalid TCP header length: " << tcphdrlen);
return;
}
sourcePort = ntohs(tcph->source);
destPort = ntohs(tcph->dest);
// debug_default(IPsrc << ":" << sourcePort << " -> " << IPdst << ":" << destPort);
// incrementare numero pacchetti (uso convenzione che addr1 contiene indirizzo interfaccia
// e che port 1 sia la porta dell'interfaccia):
// se addr1(interfaccia) e' sorgente e addr2(remote) e' destinazione, UPLOAD
if ((strcmp(addr1, IPsrc) == 0) && (strcmp(addr2, IPdst) == 0) &&
(sourcePort == port1) && (destPort == port2)) {
// debug_green(IPsrc << ":" << sourcePort << " -> " << IPdst << ":" << destPort << " IS UPLOAD");
// std::string fileName = "LOG_UPLOAD_"+remoteHostname;
// // fsUpload.open(fileName, std::ios::app);
long long useconds = (header->ts.tv_sec * 1000000) + header->ts.tv_usec;
long long delay = ((header->ts.tv_sec + oldTimevalUpload.tv_sec) * 1000000) -
oldTimevalUpload.tv_usec + header->ts.tv_usec;
// std::lock_guard<std::mutex> lock(mtx);
// StatManager::getInstance().actualQoSstats.numPacketsUpload++;
long long pps = ((long long)(pkt_data) * 1000000) / delay;
long long bps = ((long long)(pkt_data+8) * 8 * 1000000) / delay;
std::string mylog = remoteHostname + " UPLOAD " + std::to_string(useconds) + " " +
std::string(IPsrc) + ":" + std::to_string(port1) +
" " + std::string(IPdst) + ":" + std::to_string(port2) + " " + std::to_string(header->len) +
/*std::to_string(pps) +*/ " " + std::to_string(bps) + '\n';
// debug_red(mylog);
// StatManager::getInstance().makeQoSstat(mylog);
oldTimevalUpload.tv_sec = header->ts.tv_sec;
oldTimevalUpload.tv_usec = header->ts.tv_usec;
}
// se addr1(interfaccia) e' destinazione e addr2(remote) e' sorgente, DOWNLOAD
else if ((strcmp(addr1, IPdst) == 0) && (strcmp(addr2, IPsrc) == 0) &&
(sourcePort == port2) && (destPort == port1)) {
// debug_green(IPsrc << ":" << sourcePort << " -> " << IPdst << ":" << destPort << " IS DOWNLOAD");
// std::string fileName = "LOG_DOWNLOAD_"+remoteHostname;
// // fsDownload.open(fileName, std::ios::app);
long long useconds = (header->ts.tv_sec * 1000000) + header->ts.tv_usec;
long long delay = ((header->ts.tv_sec + oldTimevalDownload.tv_sec) * 1000000) -
oldTimevalDownload.tv_usec + header->ts.tv_usec;
// std::lock_guard<std::mutex> lock(mtx);
// StatManager::getInstance().actualQoSstats.numPacketsDownload++;
long long pps = ((long long)(pkt_data) * 1000000) / delay;
long long bps = ((long long)(pkt_data+8) * 8 * 1000000) / delay;
std::string mylog = remoteHostname + " DOWNLOAD " + std::to_string(useconds) + " " +
std::string(IPsrc) + ":" + std::to_string(port2) +
" " + std::string(IPdst) + ":" + std::to_string(port1) + " " + std::to_string(header->len) +
/*std::to_string(pps) +*/ " " + std::to_string(bps) + '\n';
// debug_red(mylog);
// StatManager::getInstance().makeQoSstat(mylog);
oldTimevalDownload.tv_sec = header->ts.tv_sec;
oldTimevalDownload.tv_usec = header->ts.tv_usec;
}
}
// to stop loop: pcap_breakloop(handle);
// pcap_freecode(fp);
pcap_close(handle);
// debug_red("QUITTING sniffing between " << IPsrc << " and " << IPdst << ", flag done is " << done);
return;
}
bool HTTPManager::dealConn(infoPkt info, struct sockaddr_in if_main) {
//debug_default("thread " << std::this_thread::get_id() << " dealing conn");
bool ret = false;;
int req = info.request;
// // StatManager::getInstance().actualQoSstats.choosedInterface = if_main.sin_addr;
// // StatManager::getInstance().actualQoSstats.byteUpload = 0;
// // StatManager::getInstance().actualQoSstats.byteDownload = 0;
// // StatManager::getInstance().actualQoSstats.numPacketsUpload = 0;
// // StatManager::getInstance().actualQoSstats.numPacketsDownload = 0;
// // StatManager::getInstance().actualQoSstats.BpsUpload = 0;
// // StatManager::getInstance().actualQoSstats.BpsDownload = 0;
/* addr1 */
struct sockaddr_in structAddrInterface;
memset(&(structAddrInterface), 0, sizeof(structAddrInterface));
structAddrInterface.sin_addr.s_addr = if_main.sin_addr.s_addr;
char IPaddrInterface[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(structAddrInterface.sin_addr), IPaddrInterface, INET_ADDRSTRLEN);
/* addr2 */
struct sockaddr_in structAddrRemote;
memset(&(structAddrRemote), 0, sizeof(structAddrRemote));
structAddrRemote.sin_addr.s_addr = info.serveraddr;
char IPaddrRemote[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(structAddrRemote.sin_addr), IPaddrRemote, INET_ADDRSTRLEN);
std::string iface = getInterfaceNameFromIP(if_main);
// // StatManager::getInstance().actualQoSstats.numPacketsUpload = 0;
// // StatManager::getInstance().actualQoSstats.numPacketsDownload = 0;
// log priority
// // StatManager::getInstance().actualQoSstats.priority = info.priority;
// // log server address
// // StatManager::getInstance().actualQoSstats.destAddr.s_addr = info.serveraddr;
// // log hostname
// strcpy(// StatManager::getInstance().actualQoSstats.hostname, info.host.c_str());
// // log visit to this hostname
// // StatManager::getInstance().actualQoSstats.visitDifference =
// // StatManager::getInstance().getSecondsLastVisit(info.host.c_str());
// debug_red("PID " << getpid() <<" HA CACCIATO IN actualQoSstats " << // StatManager::getInstance().actualQoSstats.visitDifference);
in_addr_t addr = info.serveraddr;
int server = openBindConnect(addr, req, &if_main); //info.serverFd;
debug_yellow("Dealing conn between sock client " <<
info.clientFd << " and sock server " << info.serverFd);
// FOR A BETTER PCAP FILTER
struct sockaddr_in clientAddrLocal, clientAddrRemote;
socklen_t lenclientAddrLocal = sizeof(clientAddrLocal);
socklen_t lenclientAddrRemote = sizeof(clientAddrRemote);
getsockname(info.clientFd, (struct sockaddr *)&clientAddrLocal, &lenclientAddrLocal);
getpeername(info.serverFd, (struct sockaddr *)&clientAddrRemote, &lenclientAddrRemote);
struct sockaddr_in serverAddrLocal, serverAddrRemote;
socklen_t lenserverAddrLocal = sizeof(serverAddrLocal);
socklen_t lenserverAddrRemote = sizeof(serverAddrRemote);
getsockname(server, (struct sockaddr *)&serverAddrLocal, &lenserverAddrLocal);
getpeername(server, (struct sockaddr *)&serverAddrRemote, &lenserverAddrRemote);
// debug_red("getsockname client " << info.client_fd << ": port " << ntohs(clientAddrLocal.sin_port) <<
// "; getpeername client: port " << ntohs(clientAddrRemote.sin_port));
// debug_red("getsockname server " << server << ": port " << ntohs(serverAddrLocal.sin_port) <<
// "; getpeername server: port " << ntohs(serverAddrRemote.sin_port));
// start pcap thread to log number of packets
// std::thread loggerThread (HTTPManager::pcapSession, this, IPaddrInterface, ntohs(serverAddrLocal.sin_port),
// IPaddrRemote, ntohs(serverAddrRemote.sin_port), iface.c_str(), info.host);
// loggerThread.detach();
if (info.request == GET_REQ) {
// // log HTTP request
// strncpy(// StatManager::getInstance().actualQoSstats.HTTPrequest, "GET", 3);
// // log port
// // StatManager::getInstance().actualQoSstats.destPort = 80;
if (sendGET(server, info.payload)) {
if (recvGET(info.clientFd, server, &if_main)) {
//debug_green("done GET between " << info.client_fd << " and " << server << " " << info.host <<
// " proc " << getpid());
ret = true;
}
else {
//debug_red("recvGET error");
ret = false;
}
}
else {
//debug_red("sendGET error");
ret = false;
}
}
else if (info.request == CONNECT_REQ) {
// // log HTTP request
// strncpy(// StatManager::getInstance().actualQoSstats.HTTPrequest, "CONNECT", 7);
// // log port
// // StatManager::getInstance().actualQoSstats.destPort = 443;
if (send200OKtoClient(info.clientFd)) {
if (tunnel(info.clientFd, server, &if_main, info.host)) {
//debug_green("done CONNECT between " << info.client_fd << " and " << server << " " << info.host <<
// " proc " << getpid());
ret = true;
}
else {
//debug_red("tunnel error");
ret = false;
}
}
else {
//debug_red("send200OKtoClient error");
ret = false;
}
}
// pcap thread? stop logging please
// done = true;
// debug_green("connection ended, setting done flag to " << done);
InterfacesManager::getInstance().freeMemory();
// // StatManager::getInstance().makeQoSstat();
debug_green("Dealing conn between sock client " <<
info.clientFd << " and sock server " << server);
return ret;
}
// return connected socket
int HTTPManager::openBindConnect(in_addr_t addr, int req, struct sockaddr_in *interface) {
sockaddr_in remote_host;
remote_host.sin_addr.s_addr = addr;
remote_host.sin_family = AF_INET;
if (req == GET_REQ)
remote_host.sin_port = htons(80);
else
remote_host.sin_port = htons(443);
int remote_sock, bind_res, conn_res;
//remote_sock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
remote_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (remote_sock < 0) {
return -1;
}
else {
// raw sockets bind with setsockopt
//bind_res = setsockopt(remote_sock, SOL_SOCKET, SO_BINDTODEVICE, inet_ntoa(interface->sin_addr), strlen(inet_ntoa(interface->sin_addr)) + 1);
bind_res = bind(remote_sock, (struct sockaddr *)interface, sizeof(struct sockaddr_in));
if (bind_res < 0) {
return -1;
}
else {
//debug_yellow("child " << getpid() << " binded (remote) socket " << remote_sock << " " << host <<
// " to interface " << inet_ntoa(interface->sin_addr));
conn_res = connect(remote_sock, (struct sockaddr*)&remote_host, sizeof(struct sockaddr));
if (conn_res < 0) {
return -1;
}
else {
return remote_sock;
}
}
}
}
bool HTTPManager::sendGET(int serverfd, std::string getReq) {
//debug_green("sendGET to server " << serverfd);
// log start connection
// gettimeofday(&StatManager::getInstance().actualQoSstats.startConnection, NULL);
unsigned int n_send = send(serverfd, getReq.c_str(), strlen(getReq.c_str()), 0);
if (n_send < 0) {
return false;
}
if (n_send == 0) {
return false;
}
if (n_send != strlen(getReq.c_str())) {
debug_red("sendGET: NOT ENOUGH BYTES SENT");
return false;
}
// debug_default("SEND GET: " << getReq);
// // StatManager::getInstance().actualQoSstats.byteUpload = n_send;
return true;
}
bool HTTPManager::recvGET(int clientfd, int serverfd, struct sockaddr_in *if_used) {
char buffer[MAX_PAYLOAD_SIZE];
int n_recv = 0;
int n_tot_recv = 0;
int block_stat_recv = 0;
struct timeval time_st, time_en;
gettimeofday(&time_st, NULL);
//debug_green("recvGET client " << clientfd << " server " << serverfd);
// FILE * pFile = NULL;
// if (isManifest) {
// pFile = fopen ("manifest.mpd", "a+");
// }
do {
memset(buffer, 0, sizeof(buffer));
n_recv = recv(serverfd, buffer, sizeof(buffer), 0);
// if (strstr(buffer, "acc.umu.se")) {
// debug_default("RECV GET " << n_recv << " BYTES: " << std::string(buffer) << '\n');
if (n_recv > 0) {
debug_default(buffer);
int n_sent = send(clientfd, buffer, n_recv, 0);
if (n_sent < 0) {
perror("Error sending to client the server reply");
return false;
}
else if (n_sent == 0) {
//debug_yellow("Connection closed by the client while sending back the message!?!?!");
}
else {
n_tot_recv += n_recv;
block_stat_recv += n_recv;
//debug_yellow(n_recv << " of " << n_tot_recv);
// // StatManager::getInstance().actual_stats.reply_ok = true;
// if (block_stat_recv >= byte_update) {
// gettimeofday(&time_en, NULL);
// InterfacesManager::getInstance().updateInterfaceStats(if_used,
// block_stat_recv, timevaldiff_usec(&time_st, &time_en));
// block_stat_recv = 0;
// gettimeofday(&time_st, NULL);
// }
}
}
else if (n_recv == 0) {
//debug_yellow("recvGET: connection closed by the server, sent " << RED << n_tot_recv << YELLOW << " bytes");
}
else {
perror("Error receiving from server while forwarding");
return false;
}
} while (n_recv > 0);
/*If is a manifest close file and read manifest*/
// if (isManifest) {
// fclose (pFile);
// initVideoRequest(clientfd);
// }
// log total byte downloaded
// // StatManager::getInstance().actualQoSstats.byteDownload = n_tot_recv;
// // log end connection
// gettimeofday(&// StatManager::getInstance().actualQoSstats.endConnection, NULL);
// // StatManager::getInstance().actual_stats.frag_bytesize = n_tot_recv;
// time(&// StatManager::getInstance().actual_stats.end_request_time);
// gettimeofday(&// StatManager::getInstance().actual_stats.end_request_timeval, NULL);
// if (n_tot_recv == block_stat_recv) { // never made stats (packet size less then "byte_update")
// gettimeofday(&time_en, NULL);
// InterfacesManager::getInstance().updateInterfaceStats(if_used, block_stat_recv, timevaldiff_usec(&time_st, &time_en));
// }
// // StatManager::getInstance().makeStat();
close(clientfd);
close(serverfd);
//debug_yellow("Done GET, received " << n_tot_recv << " bytes");
return true;
}
bool HTTPManager::send200OKtoClient(int clientfd) {
//debug_green("CONNECT: send to client \"" << CONNECT_200_OK << "\"\n");
int connect_200_to_client_send = send(clientfd, CONNECT_200_OK, strlen(CONNECT_200_OK), 0);
if (connect_200_to_client_send < 0) {
perror("CONNECT send() 200 OK to client failed");
return false;
}
else if (connect_200_to_client_send == 0) {
//debug_yellow("send 200 OK, client closed connection??\n");
return false;
}
//debug_green("child " << getpid() << " sent 200 OK to client " << clientfd << " went fine");
return true;
}
bool HTTPManager::tunnel(int clientfd, int serverfd, struct sockaddr_in *if_used, std::string host) {
//debug_red("child " << getpid() << " opening TUNNEL between " << clientfd << " and " << serverfd);
int r;
unsigned int totByteUpload = 0, totByteDownload = 0; // tot upload, tot download
unsigned int read_from_server = 0, send_to_client = 0;
unsigned int read_from_client = 0, send_to_server = 0;
//unsigned int numPktsUpload = 0, numPktsDownload = 0;
struct timeval timeout;
char https_buf[MAX_PAYLOAD_SIZE];
int https_buf_size = sizeof(https_buf);
fd_set fdset;
// long long usecondsDiff;
int maxp1 = serverfd >= clientfd ? serverfd+1 : clientfd+1;
bool ret = true;
// struct timeval tm_start, tm_end;
// struct timeval timeUploadStart, timeDownloadStart;
// struct timeval timeUploadEnd, timeDownloadEnd;
// int block_stat_recv = 0;
// gettimeofday(&// StatManager::getInstance().actualQoSstats.startConnection, NULL);
// time(&// StatManager::getInstance().actualQoSstats.startConnTime);
while(true) {
/*gettimeofday(&timeUploadStart, NULL);
gettimeofday(&timeDownloadStart, NULL);*/
memset(https_buf, 0, https_buf_size);
FD_ZERO(&fdset);
FD_SET(clientfd, &fdset);
FD_SET(serverfd, &fdset);
timeout.tv_sec = 5;
timeout.tv_usec = 0;
r = select(maxp1, &fdset, NULL, NULL, &timeout);
// ERROR SELECT
if (r < 0) {
perror("select()");
close(clientfd);
clientfd = -1;
close(serverfd);
serverfd = -1;
ret = false;
/*gettimeofday(&timeUploadEnd, NULL);
gettimeofday(&timeDownloadEnd, NULL);*/
break;
}
// TIMEOUT SELECT
if (r == 0) {
close(clientfd);
clientfd = -1;
close(serverfd);
serverfd = -1;
//ret = false; ... timeout e' da considerarsi errore?
/*gettimeofday(&timeUploadEnd, NULL);
gettimeofday(&timeDownloadEnd, NULL);*/
break;
}
// OK SELECT CLIENT -> SERVER
if (FD_ISSET(clientfd, &fdset)) {
//debug_green("tunnel(): ready to read from client\n");
read_from_client = recv(clientfd, https_buf, https_buf_size, 0);
if (read_from_client < 0) {
perror("recv() from client");
close(clientfd);
clientfd = -1;
close(serverfd);
serverfd = -1;
ret = false;
break;
}
else if (read_from_client == 0) {
close(clientfd);
clientfd = -1;
close(serverfd);
serverfd = -1;
break;
}
// debug_green("byte letti da CLIENT: " << read_from_client);
send_to_server = send(serverfd, https_buf, read_from_client, 0);
if (send_to_server < 0) {
perror("send() to server");
close(clientfd);
clientfd = -1;
close(serverfd);
serverfd = -1;
break;
}
//debug_red("from client " << read_from_client << "; to server: " << send_to_server);
totByteUpload += send_to_server;
}
// OK CLIENT SERVER -> CLIENT
if (FD_ISSET(serverfd, &fdset)) {
read_from_server = recv(serverfd, https_buf, https_buf_size, 0);
if (read_from_server < 0) {
perror("recv() from server");
close(serverfd);
serverfd = -1;
close(clientfd);
clientfd = -1;
ret = false;
break;
}
else if (read_from_server == 0) {
close(serverfd);
serverfd = -1;
close(clientfd);
clientfd = -1;
break;
}
// debug_green("byte letti da SERVER: " << read_from_server);
send_to_client = send(clientfd, https_buf, read_from_server, 0);
if (send_to_client < 0) {
perror("send() to client");
close(clientfd);
clientfd = -1;
close(serverfd);
serverfd = -1;
ret = false;
break;
}
//debug_red("from server " << read_from_server << "; to client: " << send_to_client);
totByteDownload += send_to_client;
/*n_tot_recv += read_from_server;
block_stat_recv += read_from_server;
debug_yellow(n_tot_recv);
if (block_stat_recv >= BLOCK_SIZE_STATS_BYTE ) {
gettimeofday(&time_en, NULL);
InterfacesManager::getInstance().updateInterfaceStats(if_used, block_stat_recv,
timevaldiff_usec(&time_st, &time_en));
block_stat_recv = 0;
gettimeofday(&time_st, NULL);
}*/
}
}
// // StatManager::getInstance().actualQoSstats.byteUpload = totByteUpload;
// // StatManager::getInstance().actualQoSstats.byteDownload = totByteDownload;
// // // StatManager::getInstance().actualQoSstats.numPacketsUpload = numPktsDownload;
// // // StatManager::getInstance().actualQoSstats.numPacketsDownload = numPktsUpload;
// gettimeofday(&// StatManager::getInstance().actualQoSstats.endConnection, NULL);
// time(&// StatManager::getInstance().actualQoSstats.endConnTime);
return ret;
}
/***************** DequeManager.h *****************/
#ifndef DEQUEMANAGER_H_
#define DEQUEMANAGER_H_
#include <thread>
#include <array>
#include <string.h>
#include <algorithm> // max_element()
#include "DequeEntry.h"
#include "HTTPManager.h"
#include "InterfacesManager.h"
#include "Miscellaneous.h"
#include "QOSManager.h"
// #include "SenderThreadManager.h"
class DequeManager {
public:
static DequeManager& getInstance() {
static DequeManager instance;
return instance;
}
bool insert(infoPkt info);
void removeAll(void);
void removeSome(void);
bool removeAndDealPkt(DequeEntry &q);
void aging(void);
void statistical(void);
void printQueues(PrintCases printCase);
void startRemoverThread(const char *remover);
void startControllerThread(const char *controller);
void resetPairPercentages(void);
void dealPktAndInterface(infoPkt ret, struct sockaddr_in interface);
private:
std::string starvationAlgorithm;
std::array<std::pair<int, float>, NUM_DEQUE> percentages;
std::thread removerThread;
std::thread controllerThread;
std::array<DequeEntry, NUM_DEQUE> deques;
};
#endif /* DEQUEMANAGER_H_ */
/***************** DequeManager.cpp *****************/
#include "DequeManager.h"
void DequeManager::dealPktAndInterface(infoPkt ret, struct sockaddr_in interface) {
// std::thread deal = std::thread(&HTTPManager::dealPkt, ret, interface);
// deal.detach();
HTTPManager::dealPkt(ret, interface);
}
inline std::ostream& operator<<(std::ostream &o, const infoPkt& p) {
o << "C " << p.clientFd << ", S " << p.serverFd << ", " <<
(p.request == GET_REQ ? "GET" : "CONNECT") << ", " <<
(p.direction == DIRECTION_UPLOAD ? "UPLOAD" : "DOWNLOAD") << ", " <<
"prio " << p.priority << ", size " << p.payloadSize << ", host " << p.host <<
", first " << p.first << ", last " << p.last;
return o;
}
static bool is_valid_fd(int fd) {
return fcntl(fd, F_GETFL) != -1 || errno != EBADF;
}
bool hasTimeoutExpired(std::chrono::time_point<std::chrono::system_clock> start/*, long &diff*/) {
auto end = std::chrono::system_clock::now();
long diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
// debug_yellow("\nelemento ha trascorso in deque " << diff << " millisecs\ncon un timeout di " <<
// TIMEOUT_AGING << " millisecs,\n" << (diff >= TIMEOUT_AGING ? "da rimuovere" : "puo' ancora restare"));
return diff >= TIMEOUT_AGING;
}
// resettare le std::pair
void DequeManager::resetPairPercentages(void) {
for (unsigned int index=0; index<percentages.size(); index++)
percentages[index] = std::make_pair(index, 0.0);
}
// removeAll() o removeSome()
void DequeManager::startRemoverThread(const char *removerAlgorithm) {
if (strcmp(removerAlgorithm, "removeAll") == 0) {
removerThread = std::thread(&DequeManager::removeAll, this);
}
else if (strcmp(removerAlgorithm, "removeSome") == 0) {
int howMany = NUM_DEQUE;
for (int i=0; i<NUM_DEQUE; i++, howMany--) {
DequeEntry &d = deques.at(i);
d.setHowManyElems(howMany);
debug_green("Deque n. " << i << " will have removed " << d.getHowManyElems() << " elements");
}
removerThread = std::thread(&DequeManager::removeSome, this);
}
removerThread.detach();
debug_green("remover thread with method DequeManager::" << removerAlgorithm << "() detached");
}
// aging() o statistical()
void DequeManager::startControllerThread(const char *controllerAlgorithm) {
if (strcmp(controllerAlgorithm, "aging") == 0) {
controllerThread = std::thread(&DequeManager::aging, this);
}
else if (strcmp(controllerAlgorithm, "statistical") == 0) {
controllerThread = std::thread(&DequeManager::statistical, this);
// inizializzo std::array<<std::pair<int, float>, NUM_DEQUE> percentages
resetPairPercentages();
}
controllerThread.detach();
starvationAlgorithm = std::string(controllerAlgorithm);
debug_green("controller thread with method DequeManager::" << controllerAlgorithm << "() detached");
}
void DequeManager::printQueues(PrintCases printCase) {
const char *color = NULL;
switch (printCase) {
case INSERT: {
color = GREEN;
break;
}
case REMOVE: {
color = YELLOW;
break;
}
case HANDLE: {
color = RED;
break;
}
default:
break;
}
for (auto &q : deques)
std::cout << color << q.size() << ' ' << std::flush;
std::cout << DEFAULTCOLOR << '\n' << std::flush;
}
bool DequeManager::insert(infoPkt info) {
//debug_red("insert priority " << info.priority);
DequeEntry &q = deques.at(info.priority-1);
if (q.insert(info)) {
printQueues(INSERT);
// debug_default("client " << info.clientFd << " server " << info.serverFd);
if (starvationAlgorithm == "statistical") {
q.increaseInserted();
}
return true;
}
else {
debug_red("FAILED DequeManager::insert()");
return false;
}
}
bool DequeManager::removeAndDealPkt(DequeEntry &q) {
bool ret = false;
infoPkt p;
struct sockaddr_in if_main;
std::list<struct sockaddr_in> if_to_use;
int serverFd = -1;
if (q.removeWithinTimeout(p)) {
if (starvationAlgorithm == "statistical") {
q.increaseRemoved();
}
printQueues(REMOVE);
// METTERE DOVE SERVE SETTAGGI DELL'INTERFACCIA USATA
InterfacesManager::getInstance().chooseIFMain(if_main, if_to_use);
// dealPktAndInterface(p, if_main);
if (p.serverFd < 0) {
serverFd = HTTPManager::openBindConnect(p.serveraddr, p.request, &if_main);
debug_default(std::this_thread::get_id() << " DequeManager::removeAndDealPkt created pair (" << p.clientFd << " " << serverFd << ")");
// return true;
if (serverFd <= 0) {
debug_red("DequeManager::removeAndDealPkt serverFd non valido: " << serverFd);
ret = false;
}
else {
p.serverFd = serverFd;
std::thread queuingThread(HTTPManager::dealPkt, p, if_main);
queuingThread.detach();
}
}
else {
if (p.last) {
debug_default(std::this_thread::get_id() << " DequeManager::removeAndDealPkt get LAST PACKET, closing fds " << p.clientFd << " and " << p.serverFd);
close(p.clientFd);
close(p.serverFd);
}
else {
// senderManager.insertPacket(p);
debug_yellow(std::this_thread::get_id() << " DequeManager::removeAndDealPkt send() " << p);
// debug_red("DequeManager::removeAndDealPkt " << strlen(p.payload) << " =? " << p.payloadSize);
if (!is_valid_fd(p.clientFd)) {
debug_red("DequeManager::removeAndDealPkt BAD CLIENT FD");
}
if (!is_valid_fd(p.serverFd)) {
debug_red("DequeManager::removeAndDealPkt BAD SERVER FD");
}
int sent_bytes = 0;
if ((sent_bytes = send((p.direction == DIRECTION_UPLOAD ? p.serverFd : p.clientFd),
p.payload, p.payloadSize, MSG_NOSIGNAL)) < 0) {
perror("DequeManager::removeAndDealPkt send()");
ret = false;
}
else {
debug_default(std::this_thread::get_id() << " DequeManager::removeAndDealPkt sent_bytes " << sent_bytes);
}
}
}
ret = true;
}
return ret;
}
void DequeManager::removeAll(void) {
while (true) {
for (auto &q : deques) {
//debug_green("DequeManager::removeAll() loop while deque " << numQueue << " not empty");
if (removeAndDealPkt(q))
// questo break interrompe il for() e ricomincia da capo, sempre dalla prima coda:
// cosi' verranno sempre rimossi gli elementi dalle code con priorita' maggiore
break;
}
}
}
void DequeManager::removeSome(void) {
//debug_red("DequeManager::remove()");
int i;
int elements;
while (true) {
for (auto &q : deques) {
elements = q.getHowManyElems();
// a seconda della coda in cui mi trovo, calcolo il numero di elementi da rimuovere
for (i=0; i<elements; i++) {
if (!removeAndDealPkt(q)) {
// rimozione fallita? andiamo alla prossima coda
break;
}
}
}
}
}
void DequeManager::aging(void) {
int i;
//int timeWait = 10000;
//long difference;
std::deque<infoPkt>::iterator it;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_AGING));
// debug_red("DequeManager::aging() in action every " << TIMEOUT_AGING << " millisecs");
for (i=1; i<NUM_DEQUE; i++) {
if (!deques[i].isEmpty()) {
for (it = deques[i].begin(); it!=deques[i].end(); ) {
// A CHE SERVE PASSARE DIFFERENCE???
if (hasTimeoutExpired(it->start/*, difference*/)) {
infoPkt ic;
if (deques[i].removeAt(ic, it)) {
ic.priority = ic.priority-1;
ic.start = std::chrono::system_clock::now();
deques[i-1].insert(ic);
// printQueues(HANDLE);
}
}
else {
++it;
}
}
}
}
}
}
void DequeManager::statistical(void) {
unsigned int index;
//int timeWait = 10000;
int whereToRemove = 0;
struct sockaddr_in if_main;
std::list<struct sockaddr_in> if_to_use;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_AGING));
// debug_red("DequeManager::statistical() in action every " << TIMEOUT_AGING << " millisecs");
for (index=0; index<deques.size(); index++) {
DequeEntry &d = deques.at(index);
if (d.getRemoved() == 0) {
percentages[index].second = 0.0;
}
else {
percentages[index].second =
((float)d.getRemoved() / (float)d.getInserted()) * 100;
}
// debug_red("deque n. " << index << " removed: " << d.getRemoved() << ", inserted: " <<
// d.getInserted() << " e' stata rimossa del " << percentages[index].second << "%");
d.resetCounters();
}
// percentages potrebbe essere come segue:
// (0, 23.5), (1, 12.1), (2, 23.5), (3, 5.5)
std::sort(percentages.begin(), percentages.end(), [] (
const std::pair<int,float> &left, const std::pair<int,float> &right) {
return left.second > right.second;
}
);
// dopo il sorting percentages sarebbe ora:
// (0, 23.5), (2, 23.5), (1, 12.1), (3, 5.5)
// a parita' di valore di second, viene comunque data
// priorita' alle deque maggiori (tra la 0 e la 2, prima la 0)
for (auto &pair : percentages) {
if (!deques[pair.first].isEmpty()) {
// debug_red("DequeManager::statistical: " << pair.first << ' ' << pair.second);
infoPkt ic;
whereToRemove = pair.first;
DequeEntry &d = deques.at(whereToRemove);
if (d.removeWithinTimeout(ic)) {
printQueues(REMOVE);
InterfacesManager::getInstance().chooseIFMain(if_main, if_to_use);
dealPktAndInterface(ic, if_main);
close(ic.clientFd);
}
// debug_red("DequeManager::statistical() popping from deque " << whereToRemove);
}
else {
// debug_red("DequeManager::statistical: deque " << pair.first << " empty(), not removing");
}
}
resetPairPercentages();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment