Last active
December 20, 2015 12:29
-
-
Save lucacervasio/6131434 to your computer and use it in GitHub Desktop.
Hercules: a multiple tcp client implementation with socket select
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* | |
* Hercules v0.2 | |
* Copyright (C) 2010 Luca Cervasio | |
* Created by Luca Cervasio <[email protected]> | |
* All rights reserved (C) 2010 Luca Cervasio | |
* You're not allowed to copy or redistribute this software without the author's consent | |
* | |
*/ | |
#include <fcntl.h> | |
#include <pthread.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/socket.h> | |
#include <resolv.h> | |
#include "thrqueue.h" | |
#include "soap.h" | |
#include <netinet/in.h> | |
#include <sys/time.h> | |
#include <unistd.h> | |
#include <sys/types.h> | |
#include <arpa/inet.h> | |
#include <time.h> | |
#include <stdarg.h> | |
#include <sys/time.h> | |
#include <getopt.h> | |
#define NUM_CPE 1 | |
#define NUM_PARALLEL_CPES 1 | |
#define ACS_IP "127.0.0.1" | |
#define ACS_PORT 8080 | |
#define ACS_URL "/acs/index" | |
#define UA "Hercules v0.2 by Luca Cervasio" | |
#define IP "10.19.0.2" | |
#define CONNECT_PORT 9600 | |
#define PERIODIC_INTERVAL 120 | |
#define DEBUG 0 | |
typedef struct cpe { | |
char serial[10]; | |
char conn_req[100]; | |
int online; | |
char manufacturer[50]; | |
char sw_ver[50]; | |
int socknum; | |
char *http_buf; | |
int http_buf_len; | |
int request_idx; | |
char *event; | |
} t_cpe; | |
typedef struct tosay { | |
char *serial; | |
//char *event; | |
} t_tosay; | |
struct Queue *q; | |
t_cpe cpes[NUM_CPE]; | |
int active_connections = 0; | |
int fdmax; | |
fd_set master; | |
fd_set connecting_fds; | |
fd_set read_fds; | |
fd_set write_fds; | |
char* string_append(char* ptr, char* dataptr) { | |
char * tmp = strcat(ptr, dataptr); | |
return tmp; | |
} | |
void debug(const char *fmt, ...) { | |
va_list args; | |
size_t len; | |
char *space; | |
va_start(args, fmt); | |
len = vsnprintf(0, 0, fmt, args); | |
va_end(args); | |
if ((space = malloc(len + 1)) != 0) { | |
va_start(args, fmt); | |
vsnprintf(space, len + 1, fmt, args); | |
va_end(args); | |
if (DEBUG == 1) { | |
printf("%s\n", space); | |
} | |
free(space); | |
} | |
/* else - what to do if memory allocation fails? */ | |
} | |
static char* get_request_line(char* str, int *request_idx) { | |
int i; | |
char c; | |
char* request = malloc(strlen(str) + 1); | |
strcpy(request, str); | |
int request_len = strlen(request); | |
for (i = *request_idx; *request_idx < request_len; *request_idx = *request_idx + 1) { | |
c = request[*request_idx]; | |
if (c == '\012' || c == '\015') { | |
request[*request_idx] = '\0'; | |
*request_idx = *request_idx + 1; | |
if (c == '\015' && *request_idx < request_len && request[*request_idx] == '\012') { | |
request[*request_idx] = '\0'; | |
*request_idx = *request_idx + 1; | |
} | |
return &(request[i]); | |
} | |
} | |
return (char*) 0; | |
} | |
void rpc_invia(int sockfd, char* msg) { | |
char buff[10000]; | |
char http_header[1000]; | |
sprintf(http_header, "POST %s HTTP/1.1\nHost: %s:%d\nUser-Agent: %s\nContent-Length: %d\nSOAPAction:\nContent-Type: text/xml; charset=utf-8\nConnection: Keep-Alive\n", | |
ACS_URL, ACS_IP, ACS_PORT, UA, (int) strlen(msg)); | |
sprintf(buff, "%s\n%s", http_header, msg); | |
// invio il messaggio | |
send(sockfd, buff, strlen(buff), 0); | |
} | |
int rpc_ricevi(int idcpe, char* data) { | |
//int bytes_read = 0; | |
long content_length = 0; | |
cpes[idcpe].request_idx = 0; | |
// controllo se nei dati letti ci sono gli header http | |
if (strstr(data, "HTTP") != (char*) 0) { | |
// Contiene request con headers HTTP, estraggo content-length | |
cpes[idcpe].http_buf_len = 0; | |
char* line; | |
while ((line = get_request_line(data, &cpes[idcpe].request_idx)) != (char*) 0) { | |
if (strncasecmp(line, "Content-Length:", 15) == 0) { | |
char* cp; | |
cp = &line[15]; | |
cp += strspn(cp, " \t"); | |
content_length = atol(cp); | |
} | |
if (strcmp(line, "") == 0) { | |
break; | |
} | |
} | |
// i dati rimanenti vanno inseriti nel buffer. | |
// questi sono i primi dati da inserire, nel caso ce ne | |
// siano altri li leggerò ai successivi colpi di select e | |
// li inserirò nel buffer (ramo else di questo if) | |
char* soapdata = &(data[cpes[idcpe].request_idx]); | |
cpes[idcpe].http_buf_len = strlen(soapdata); | |
strcpy(cpes[idcpe].http_buf, soapdata); | |
cpes[idcpe].request_idx = 0; | |
} else { | |
// i dati ricevuti dalla select non contengono headers html | |
// e quindi rappresentano altri dati della medesima richiesta HTTP | |
if (cpes[idcpe].http_buf_len < content_length) { | |
// Appendo i dati ricevuti al buffer | |
cpes[idcpe].http_buf = string_append(cpes[idcpe].http_buf, data); | |
cpes[idcpe].http_buf_len += strlen(data); | |
} else { | |
// i dati che sto ricevendo non contengono headers http | |
// e non mi aspetto dati per continuare una request da | |
// questo socket. che cosa cazzo sono allora questi dati ? | |
printf("\n ERROR \n\n"); | |
return 0; | |
} | |
} | |
int to_res; | |
if (cpes[idcpe].http_buf_len < content_length) { | |
to_res = 0; | |
} else { | |
to_res = 1; | |
} | |
return to_res; | |
} | |
void crea_cpes() { | |
int i; | |
for (i = 0; i < NUM_CPE; i++) { | |
int id_cpe = i + 1; | |
sprintf(cpes[i].serial, "%d", id_cpe); | |
sprintf(cpes[i].conn_req, "http://%s:%d/%d", IP, CONNECT_PORT, id_cpe); | |
cpes[i].online = 0; | |
cpes[i].socknum = 0; | |
cpes[i].http_buf_len = 0; | |
cpes[i].http_buf = malloc(60000 * sizeof (char)); | |
strcpy(cpes[i].manufacturer, "PIRELLI BROADBAND SOLUTIONS"); | |
strcpy(cpes[i].sw_ver, "3_RGF09_06_00_00_0013"); | |
} | |
} | |
void *listener(void *threadid) { | |
int sd, psd; | |
struct sockaddr_in name; | |
char buf[1024]; | |
int cc; | |
sd = socket(AF_INET, SOCK_STREAM, 0); | |
name.sin_family = AF_INET; | |
name.sin_addr.s_addr = htonl(INADDR_ANY); | |
name.sin_port = htons(CONNECT_PORT); | |
bind(sd, (struct sockaddr *) & name, sizeof (name)); | |
listen(sd, 1); | |
while (1) { | |
psd = accept(sd, 0, 0); | |
cc = recv(psd, buf, sizeof (buf), 0); | |
buf[cc] = (char) 0; | |
int a = 0; | |
char *line = get_request_line(buf, &a); | |
char *serial = (char*) malloc(20); | |
strncpy(serial, line + 5, 1); // TODO l'estrazione del seriale viene fatto solo lungo un carattere, da fissare | |
// invio la risposta | |
char http[200]; | |
sprintf(http, "HTTP/1.1 200 OK\nUser-Agent: %s\nContent-Length: 0\nConnection: Close\n", UA); | |
send(psd, http, strlen(http), 0); | |
close(psd); | |
printf("Received a connection request for CPE id=%s\n", serial); | |
t_tosay *dati; | |
dati = malloc(sizeof (t_tosay)); | |
dati->serial = serial; | |
//dati->event = EV6; | |
int s = atoi(serial); | |
s--; | |
printf("serial = %d\n", s); | |
cpes[s].event = EV6; | |
// metto in coda l'evento CONNECTION REQUEST | |
queue_enq(q, dati); | |
} | |
pthread_exit(NULL); | |
} | |
void connection_starter(t_tosay *work) { | |
int sockfd; | |
struct sockaddr_in dest; | |
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) | |
perror("Socket"); | |
int x; | |
x = fcntl(sockfd, F_GETFL, 0); | |
fcntl(sockfd, F_SETFL, x | O_NONBLOCK); | |
// inizializzo la struttura per la destinazione del socket | |
bzero(&dest, sizeof (dest)); | |
dest.sin_family = AF_INET; | |
dest.sin_port = htons(ACS_PORT); | |
inet_pton(AF_INET, ACS_IP, &(dest.sin_addr)); | |
// mi connetto all'acs | |
if (connect(sockfd, (struct sockaddr*) & dest, sizeof (dest)) != 0) { | |
// perror("Connect"); | |
} | |
int s = sockfd; | |
// aggiungo il socket a quelli da gestire e tengo traccia del massimo | |
FD_SET(s, &connecting_fds); | |
if (s > fdmax) { | |
fdmax = s; | |
} | |
active_connections++; | |
int ser = atoi(work->serial); | |
ser--; | |
cpes[ser].socknum = s; | |
//printf("CPE #%s starts connections to acs\n", work->serial); | |
} | |
void *connection_selecter(void *roba) { | |
struct timeval start, now; | |
time_t curtime; | |
gettimeofday(&start, NULL); | |
struct timeval *timeout; | |
timeout = malloc(sizeof (struct timeval)); | |
timeout->tv_sec = 0; | |
timeout->tv_usec = 500000; // timeout della select vale 0.5 s | |
char buf[66000]; // buffer for client data | |
int nbytes; | |
//char remoteIP[INET6_ADDRSTRLEN]; | |
int i; | |
//struct addrinfo *p; | |
FD_ZERO(&master); // clear the master and temp sets | |
FD_ZERO(&connecting_fds); | |
FD_ZERO(&read_fds); | |
FD_ZERO(&write_fds); | |
while (1) { | |
/* | |
* In relazione alle connessioni attive e alla lunghezza della coda ho 4 casi: | |
* 1) non ho connessioni e non ho niente in coda: in questo caso non posso fare altro che | |
* attendere indefinitamente in coda (primo ramo dell'if) | |
* 2) non ho connessioni e ho richieste in coda: le scodo (secondo ramo) | |
* 3) ho connessioni ed ho anche richieste in coda: anche in questo caso le scodo (secondo ramo) | |
* 4) ho connessioni ma non ho richieste in coda: vado direttamente alla select, inutile scodare (terzo ramo) | |
*/ | |
if (active_connections == 0 && queue_length(q) == 0) { | |
// non ho connessioni e non ho messaggi in coda, attendo in coda in modo indefinito | |
printf("No active connections, waiting\n"); | |
t_tosay *work = queue_deq(q); | |
connection_starter(work); | |
} else if (active_connections == 0 || (active_connections > 0 && queue_length(q) > 0)) { | |
// non ho connessioni ma ci sono richieste, allora le scodo tutte | |
// oppure ho delle connessioni e ho anche delle richieste in coda: anche in questo caso le scodo tutte | |
// Per quanto riguarda l'operazione di scodamento continuo a farla finchè la coda è piena, fino | |
// ad avere non più di NUM_PARALLEL_CPES connessioni contemporanee e soprattutto non scodo | |
// mai più di 3 richieste di seguito, in modo tale da lasciare sempre lo spazio alla select per gestire | |
// i socket attivi. Se non ve ne fossero, la select esce dopo 0.5s e ritorno a questo if di controllo | |
while (queue_length(q) > 0 && active_connections < NUM_PARALLEL_CPES) { | |
t_tosay *work = queue_deq(q); | |
connection_starter(work); | |
} | |
} else { | |
// ho delle connessioni da gestire ma nessuna richiesta in coda: vado direttamente alla select | |
} | |
gettimeofday(&now, NULL); | |
double elapsed; | |
elapsed = (now.tv_sec + now.tv_usec / 1000000.0 - start.tv_sec - start.tv_usec / 1000000.0) * 1000; // elapsed in ms | |
printf("now.sec=%d now.usec=%d start.sec=%d start.usec=%d elapsed=%f\n", now.tv_sec, now.tv_usec, start.tv_sec, start.tv_usec, elapsed); | |
if (elapsed > 1000) { | |
start = now; | |
printf("stampa (elapsed: %f)\n", elapsed); | |
} | |
read_fds = master; // copy it | |
write_fds = connecting_fds; // copy it | |
int res = select(fdmax + 1, &read_fds, &write_fds, NULL, timeout); // la select si sospende al max per 0.5 s | |
if (res == -1) { | |
// errore nella select | |
perror("select"); | |
exit(4); | |
} else if (res == 0) { | |
// res è zero, significa un timeout senza che vi siano socket pronti per la lettura | |
} else { | |
// res è maggiore di zero: ci sono dei socket da cercare | |
for (i = 0; i <= fdmax; i++) { | |
if (FD_ISSET(i, &read_fds)) { // we got one!! | |
// handle data from a client | |
bzero(buf, 60000); | |
if ((nbytes = recv(i, buf, sizeof buf, 0)) <= 0) { | |
// got error or connection closed by client | |
if (nbytes == 0) { | |
// connection closed | |
printf("socket %d hung up\n", i); | |
} else { | |
perror("recv"); | |
} | |
close(i); // bye! | |
active_connections--; | |
FD_CLR(i, &master); // remove from master set | |
} else { | |
// we got some data from a client and need to | |
// "compose" http request from these datas | |
// cerco l'id dell'array cpes corrispondente a questo socket | |
int cpe_id = 0; | |
int j; | |
for (j = 0; j <= NUM_CPE; j++) { | |
if (cpes[j].socknum == i) { | |
cpe_id = j; | |
break; | |
} | |
} | |
// compongo la richiesta http. ritorna 1 se ha ricevuto tutto, | |
// 0 se mancano dei dati che leggerò al prossimo giro di select | |
int res = rpc_ricevi(cpe_id, buf); | |
if (res == 1) { | |
// ho ricevuto tutto in un colpo solo | |
if (strstr(cpes[cpe_id].http_buf, "InformResponse") != NULL) { | |
debug("IN #%d InformResponse", cpe_id + 1); | |
// invio la post vuota | |
char *empty = ""; | |
rpc_invia(i, empty); | |
debug("OUT #%d Empty Post\n", cpe_id + 1); | |
} else if (strstr(cpes[cpe_id].http_buf, "GetParameterValues") != NULL) { | |
char *getparametervalues_r = getparametervalues_response(); | |
rpc_invia(i, getparametervalues_r); | |
} else if (strcmp(cpes[cpe_id].http_buf, "") == 0) { | |
// chiudo il socket | |
debug("IN #%d 204 No-Content\n", cpe_id + 1); | |
close(i); | |
FD_CLR(i, &master); // remove from master set | |
active_connections--; | |
debug("OUT #%d Closing connection...\n", cpe_id + 1); | |
} else { | |
printf("ho ricevuto un messaggio che non so interpretare\n"); | |
printf("%s\n", buf); | |
exit(1); | |
} | |
} else { | |
// i dati ricevuti non sono sufficienti a completare la richiesta, | |
// devo aspettare un altro giro di select | |
printf("ho ricevuto dei dati che non sono completi\n"); | |
} | |
} | |
} // END handle data from client | |
if (FD_ISSET(i, &write_fds)) { | |
// la connect è terminata | |
// cerco l'id dell'array cpes corrispondente a questo socket | |
int cpe_id = 0; | |
int j; | |
for (j = 0; j <= NUM_CPE; j++) { | |
if (cpes[j].socknum == i) { | |
cpe_id = j; | |
break; | |
} | |
} | |
int ser = cpe_id; | |
debug("OUT #%d Inform\n", cpe_id + 1); | |
char *inform_msg = inform(cpes[ser].manufacturer, "1", cpes[ser].event, "NGRG 2009", cpes[ser].sw_ver, cpes[ser].conn_req, IP); | |
// invio la inform | |
rpc_invia(i, inform_msg); | |
FD_CLR(i, &connecting_fds); // tolto il socket da quelli a cui mi devo connettere | |
FD_SET(i, &master); | |
} // send data to client | |
} // END looping through file descriptors | |
} // END ramo else dell'if sulla select | |
} // END while (1) | |
} | |
void scheduler() { | |
// avvia tutte le cpe con eventcode BOOTSTRAP | |
int i; | |
for (i = 0; i < NUM_CPE; i++) { | |
t_tosay *dati; | |
dati = malloc(sizeof (t_tosay)); | |
dati->serial = cpes[i].serial; | |
//dati->event = EV0; | |
cpes[i].event = EV0; | |
queue_enq(q, dati); | |
} | |
// continua a schedulare periodic o reboot casuali | |
int cont = 0; | |
while (1) { | |
cont++; | |
if (cont == PERIODIC_INTERVAL) { | |
cont = 0; | |
for (i = 0; i < NUM_CPE; i++) { | |
t_tosay *dati; | |
dati = malloc(sizeof (t_tosay)); | |
dati->serial = cpes[i].serial; | |
//dati->event = EV2; | |
cpes[i].event = EV2; | |
queue_enq(q, dati); | |
} | |
} | |
sleep(1); | |
} | |
} | |
int main(int argc, char *argv[]) { | |
printf("%s\n", UA); | |
printf("Copyright (C) 2010 Luca Cervasio\n"); | |
printf("Created by Luca Cervasio <[email protected]>\n"); | |
printf("All rights reserved (C) 2010 Luca Cervasio\n"); | |
printf("You're not allowed to copy or redistribute this\n"); | |
printf("software without the author's consent\n"); | |
printf("==================================================\n"); | |
printf("Simulator is running with %d CPEs and %d maximum parallel connections\n", NUM_CPE, NUM_PARALLEL_CPES); | |
int debug; | |
int port; | |
char* ip; | |
int num_cpe; | |
int index; | |
int c; | |
while ((c = getopt(argc, argv, "n:i:p:d")) != -1) | |
switch (c) { | |
case 'n': | |
num_cpe = optarg; | |
break; | |
case 'i': | |
ip = optarg; | |
break; | |
case 'p': | |
port = optarg; | |
break; | |
case 'd': | |
debug = 1; | |
break; | |
case '?': | |
if (optopt == 'c') | |
fprintf(stderr, "Option -%c requires an argument.\n", optopt); | |
else if (isprint(optopt)) | |
fprintf(stderr, "Unknown option `-%c'.\n", optopt); | |
else | |
fprintf(stderr, | |
"Unknown option character `\\x%x'.\n", | |
optopt); | |
return 1; | |
default: | |
abort(); | |
} | |
for (index = optind; index < argc; index++) | |
printf("Non-option argument %s\n", argv[index]); | |
q = queue_init(); | |
queue_limit(q, 2500); // setting queue depth | |
int r1, r2; | |
pthread_t *thread1 = malloc(sizeof (pthread_t)); | |
pthread_t *thread2 = malloc(sizeof (pthread_t)); | |
long msg1, msg2; | |
// instanzio gli oggetti CPEs | |
crea_cpes(); | |
// avvio il listener | |
r1 = pthread_create(thread1, NULL, listener, (void *) msg1); | |
// avvio il connection manager (il ciclo della select) | |
r2 = pthread_create(thread2, NULL, connection_selecter, (void *) msg2); | |
// avvio lo scheduler | |
scheduler(); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment