Skip to content

Instantly share code, notes, and snippets.

@lucacervasio
Last active December 20, 2015 12:29
Show Gist options
  • Save lucacervasio/6131434 to your computer and use it in GitHub Desktop.
Save lucacervasio/6131434 to your computer and use it in GitHub Desktop.
Hercules: a multiple tcp client implementation with socket select
/*
*
* Hercules v0.2
* Copyright (C) 2010 Luca Cervasio
* Created by Luca Cervasio <[email protected]>
* All rights reserved (C) 2010 Luca Cervasio
* You're not allowed to copy or redistribute this software without the author's consent
*
*/
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <resolv.h>
#include "thrqueue.h"
#include "soap.h"
#include <netinet/in.h>
#include <sys/time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <time.h>
#include <stdarg.h>
#include <sys/time.h>
#include <getopt.h>
#define NUM_CPE 1
#define NUM_PARALLEL_CPES 1
#define ACS_IP "127.0.0.1"
#define ACS_PORT 8080
#define ACS_URL "/acs/index"
#define UA "Hercules v0.2 by Luca Cervasio"
#define IP "10.19.0.2"
#define CONNECT_PORT 9600
#define PERIODIC_INTERVAL 120
#define DEBUG 0
typedef struct cpe {
char serial[10];
char conn_req[100];
int online;
char manufacturer[50];
char sw_ver[50];
int socknum;
char *http_buf;
int http_buf_len;
int request_idx;
char *event;
} t_cpe;
typedef struct tosay {
char *serial;
//char *event;
} t_tosay;
struct Queue *q;
t_cpe cpes[NUM_CPE];
int active_connections = 0;
int fdmax;
fd_set master;
fd_set connecting_fds;
fd_set read_fds;
fd_set write_fds;
char* string_append(char* ptr, char* dataptr) {
char * tmp = strcat(ptr, dataptr);
return tmp;
}
void debug(const char *fmt, ...) {
va_list args;
size_t len;
char *space;
va_start(args, fmt);
len = vsnprintf(0, 0, fmt, args);
va_end(args);
if ((space = malloc(len + 1)) != 0) {
va_start(args, fmt);
vsnprintf(space, len + 1, fmt, args);
va_end(args);
if (DEBUG == 1) {
printf("%s\n", space);
}
free(space);
}
/* else - what to do if memory allocation fails? */
}
static char* get_request_line(char* str, int *request_idx) {
int i;
char c;
char* request = malloc(strlen(str) + 1);
strcpy(request, str);
int request_len = strlen(request);
for (i = *request_idx; *request_idx < request_len; *request_idx = *request_idx + 1) {
c = request[*request_idx];
if (c == '\012' || c == '\015') {
request[*request_idx] = '\0';
*request_idx = *request_idx + 1;
if (c == '\015' && *request_idx < request_len && request[*request_idx] == '\012') {
request[*request_idx] = '\0';
*request_idx = *request_idx + 1;
}
return &(request[i]);
}
}
return (char*) 0;
}
void rpc_invia(int sockfd, char* msg) {
char buff[10000];
char http_header[1000];
sprintf(http_header, "POST %s HTTP/1.1\nHost: %s:%d\nUser-Agent: %s\nContent-Length: %d\nSOAPAction:\nContent-Type: text/xml; charset=utf-8\nConnection: Keep-Alive\n",
ACS_URL, ACS_IP, ACS_PORT, UA, (int) strlen(msg));
sprintf(buff, "%s\n%s", http_header, msg);
// invio il messaggio
send(sockfd, buff, strlen(buff), 0);
}
int rpc_ricevi(int idcpe, char* data) {
//int bytes_read = 0;
long content_length = 0;
cpes[idcpe].request_idx = 0;
// controllo se nei dati letti ci sono gli header http
if (strstr(data, "HTTP") != (char*) 0) {
// Contiene request con headers HTTP, estraggo content-length
cpes[idcpe].http_buf_len = 0;
char* line;
while ((line = get_request_line(data, &cpes[idcpe].request_idx)) != (char*) 0) {
if (strncasecmp(line, "Content-Length:", 15) == 0) {
char* cp;
cp = &line[15];
cp += strspn(cp, " \t");
content_length = atol(cp);
}
if (strcmp(line, "") == 0) {
break;
}
}
// i dati rimanenti vanno inseriti nel buffer.
// questi sono i primi dati da inserire, nel caso ce ne
// siano altri li leggerò ai successivi colpi di select e
// li inserirò nel buffer (ramo else di questo if)
char* soapdata = &(data[cpes[idcpe].request_idx]);
cpes[idcpe].http_buf_len = strlen(soapdata);
strcpy(cpes[idcpe].http_buf, soapdata);
cpes[idcpe].request_idx = 0;
} else {
// i dati ricevuti dalla select non contengono headers html
// e quindi rappresentano altri dati della medesima richiesta HTTP
if (cpes[idcpe].http_buf_len < content_length) {
// Appendo i dati ricevuti al buffer
cpes[idcpe].http_buf = string_append(cpes[idcpe].http_buf, data);
cpes[idcpe].http_buf_len += strlen(data);
} else {
// i dati che sto ricevendo non contengono headers http
// e non mi aspetto dati per continuare una request da
// questo socket. che cosa cazzo sono allora questi dati ?
printf("\n ERROR \n\n");
return 0;
}
}
int to_res;
if (cpes[idcpe].http_buf_len < content_length) {
to_res = 0;
} else {
to_res = 1;
}
return to_res;
}
void crea_cpes() {
int i;
for (i = 0; i < NUM_CPE; i++) {
int id_cpe = i + 1;
sprintf(cpes[i].serial, "%d", id_cpe);
sprintf(cpes[i].conn_req, "http://%s:%d/%d", IP, CONNECT_PORT, id_cpe);
cpes[i].online = 0;
cpes[i].socknum = 0;
cpes[i].http_buf_len = 0;
cpes[i].http_buf = malloc(60000 * sizeof (char));
strcpy(cpes[i].manufacturer, "PIRELLI BROADBAND SOLUTIONS");
strcpy(cpes[i].sw_ver, "3_RGF09_06_00_00_0013");
}
}
void *listener(void *threadid) {
int sd, psd;
struct sockaddr_in name;
char buf[1024];
int cc;
sd = socket(AF_INET, SOCK_STREAM, 0);
name.sin_family = AF_INET;
name.sin_addr.s_addr = htonl(INADDR_ANY);
name.sin_port = htons(CONNECT_PORT);
bind(sd, (struct sockaddr *) & name, sizeof (name));
listen(sd, 1);
while (1) {
psd = accept(sd, 0, 0);
cc = recv(psd, buf, sizeof (buf), 0);
buf[cc] = (char) 0;
int a = 0;
char *line = get_request_line(buf, &a);
char *serial = (char*) malloc(20);
strncpy(serial, line + 5, 1); // TODO l'estrazione del seriale viene fatto solo lungo un carattere, da fissare
// invio la risposta
char http[200];
sprintf(http, "HTTP/1.1 200 OK\nUser-Agent: %s\nContent-Length: 0\nConnection: Close\n", UA);
send(psd, http, strlen(http), 0);
close(psd);
printf("Received a connection request for CPE id=%s\n", serial);
t_tosay *dati;
dati = malloc(sizeof (t_tosay));
dati->serial = serial;
//dati->event = EV6;
int s = atoi(serial);
s--;
printf("serial = %d\n", s);
cpes[s].event = EV6;
// metto in coda l'evento CONNECTION REQUEST
queue_enq(q, dati);
}
pthread_exit(NULL);
}
void connection_starter(t_tosay *work) {
int sockfd;
struct sockaddr_in dest;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
perror("Socket");
int x;
x = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, x | O_NONBLOCK);
// inizializzo la struttura per la destinazione del socket
bzero(&dest, sizeof (dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(ACS_PORT);
inet_pton(AF_INET, ACS_IP, &(dest.sin_addr));
// mi connetto all'acs
if (connect(sockfd, (struct sockaddr*) & dest, sizeof (dest)) != 0) {
// perror("Connect");
}
int s = sockfd;
// aggiungo il socket a quelli da gestire e tengo traccia del massimo
FD_SET(s, &connecting_fds);
if (s > fdmax) {
fdmax = s;
}
active_connections++;
int ser = atoi(work->serial);
ser--;
cpes[ser].socknum = s;
//printf("CPE #%s starts connections to acs\n", work->serial);
}
void *connection_selecter(void *roba) {
struct timeval start, now;
time_t curtime;
gettimeofday(&start, NULL);
struct timeval *timeout;
timeout = malloc(sizeof (struct timeval));
timeout->tv_sec = 0;
timeout->tv_usec = 500000; // timeout della select vale 0.5 s
char buf[66000]; // buffer for client data
int nbytes;
//char remoteIP[INET6_ADDRSTRLEN];
int i;
//struct addrinfo *p;
FD_ZERO(&master); // clear the master and temp sets
FD_ZERO(&connecting_fds);
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
while (1) {
/*
* In relazione alle connessioni attive e alla lunghezza della coda ho 4 casi:
* 1) non ho connessioni e non ho niente in coda: in questo caso non posso fare altro che
* attendere indefinitamente in coda (primo ramo dell'if)
* 2) non ho connessioni e ho richieste in coda: le scodo (secondo ramo)
* 3) ho connessioni ed ho anche richieste in coda: anche in questo caso le scodo (secondo ramo)
* 4) ho connessioni ma non ho richieste in coda: vado direttamente alla select, inutile scodare (terzo ramo)
*/
if (active_connections == 0 && queue_length(q) == 0) {
// non ho connessioni e non ho messaggi in coda, attendo in coda in modo indefinito
printf("No active connections, waiting\n");
t_tosay *work = queue_deq(q);
connection_starter(work);
} else if (active_connections == 0 || (active_connections > 0 && queue_length(q) > 0)) {
// non ho connessioni ma ci sono richieste, allora le scodo tutte
// oppure ho delle connessioni e ho anche delle richieste in coda: anche in questo caso le scodo tutte
// Per quanto riguarda l'operazione di scodamento continuo a farla finchè la coda è piena, fino
// ad avere non più di NUM_PARALLEL_CPES connessioni contemporanee e soprattutto non scodo
// mai più di 3 richieste di seguito, in modo tale da lasciare sempre lo spazio alla select per gestire
// i socket attivi. Se non ve ne fossero, la select esce dopo 0.5s e ritorno a questo if di controllo
while (queue_length(q) > 0 && active_connections < NUM_PARALLEL_CPES) {
t_tosay *work = queue_deq(q);
connection_starter(work);
}
} else {
// ho delle connessioni da gestire ma nessuna richiesta in coda: vado direttamente alla select
}
gettimeofday(&now, NULL);
double elapsed;
elapsed = (now.tv_sec + now.tv_usec / 1000000.0 - start.tv_sec - start.tv_usec / 1000000.0) * 1000; // elapsed in ms
printf("now.sec=%d now.usec=%d start.sec=%d start.usec=%d elapsed=%f\n", now.tv_sec, now.tv_usec, start.tv_sec, start.tv_usec, elapsed);
if (elapsed > 1000) {
start = now;
printf("stampa (elapsed: %f)\n", elapsed);
}
read_fds = master; // copy it
write_fds = connecting_fds; // copy it
int res = select(fdmax + 1, &read_fds, &write_fds, NULL, timeout); // la select si sospende al max per 0.5 s
if (res == -1) {
// errore nella select
perror("select");
exit(4);
} else if (res == 0) {
// res è zero, significa un timeout senza che vi siano socket pronti per la lettura
} else {
// res è maggiore di zero: ci sono dei socket da cercare
for (i = 0; i <= fdmax; i++) {
if (FD_ISSET(i, &read_fds)) { // we got one!!
// handle data from a client
bzero(buf, 60000);
if ((nbytes = recv(i, buf, sizeof buf, 0)) <= 0) {
// got error or connection closed by client
if (nbytes == 0) {
// connection closed
printf("socket %d hung up\n", i);
} else {
perror("recv");
}
close(i); // bye!
active_connections--;
FD_CLR(i, &master); // remove from master set
} else {
// we got some data from a client and need to
// "compose" http request from these datas
// cerco l'id dell'array cpes corrispondente a questo socket
int cpe_id = 0;
int j;
for (j = 0; j <= NUM_CPE; j++) {
if (cpes[j].socknum == i) {
cpe_id = j;
break;
}
}
// compongo la richiesta http. ritorna 1 se ha ricevuto tutto,
// 0 se mancano dei dati che leggerò al prossimo giro di select
int res = rpc_ricevi(cpe_id, buf);
if (res == 1) {
// ho ricevuto tutto in un colpo solo
if (strstr(cpes[cpe_id].http_buf, "InformResponse") != NULL) {
debug("IN #%d InformResponse", cpe_id + 1);
// invio la post vuota
char *empty = "";
rpc_invia(i, empty);
debug("OUT #%d Empty Post\n", cpe_id + 1);
} else if (strstr(cpes[cpe_id].http_buf, "GetParameterValues") != NULL) {
char *getparametervalues_r = getparametervalues_response();
rpc_invia(i, getparametervalues_r);
} else if (strcmp(cpes[cpe_id].http_buf, "") == 0) {
// chiudo il socket
debug("IN #%d 204 No-Content\n", cpe_id + 1);
close(i);
FD_CLR(i, &master); // remove from master set
active_connections--;
debug("OUT #%d Closing connection...\n", cpe_id + 1);
} else {
printf("ho ricevuto un messaggio che non so interpretare\n");
printf("%s\n", buf);
exit(1);
}
} else {
// i dati ricevuti non sono sufficienti a completare la richiesta,
// devo aspettare un altro giro di select
printf("ho ricevuto dei dati che non sono completi\n");
}
}
} // END handle data from client
if (FD_ISSET(i, &write_fds)) {
// la connect è terminata
// cerco l'id dell'array cpes corrispondente a questo socket
int cpe_id = 0;
int j;
for (j = 0; j <= NUM_CPE; j++) {
if (cpes[j].socknum == i) {
cpe_id = j;
break;
}
}
int ser = cpe_id;
debug("OUT #%d Inform\n", cpe_id + 1);
char *inform_msg = inform(cpes[ser].manufacturer, "1", cpes[ser].event, "NGRG 2009", cpes[ser].sw_ver, cpes[ser].conn_req, IP);
// invio la inform
rpc_invia(i, inform_msg);
FD_CLR(i, &connecting_fds); // tolto il socket da quelli a cui mi devo connettere
FD_SET(i, &master);
} // send data to client
} // END looping through file descriptors
} // END ramo else dell'if sulla select
} // END while (1)
}
void scheduler() {
// avvia tutte le cpe con eventcode BOOTSTRAP
int i;
for (i = 0; i < NUM_CPE; i++) {
t_tosay *dati;
dati = malloc(sizeof (t_tosay));
dati->serial = cpes[i].serial;
//dati->event = EV0;
cpes[i].event = EV0;
queue_enq(q, dati);
}
// continua a schedulare periodic o reboot casuali
int cont = 0;
while (1) {
cont++;
if (cont == PERIODIC_INTERVAL) {
cont = 0;
for (i = 0; i < NUM_CPE; i++) {
t_tosay *dati;
dati = malloc(sizeof (t_tosay));
dati->serial = cpes[i].serial;
//dati->event = EV2;
cpes[i].event = EV2;
queue_enq(q, dati);
}
}
sleep(1);
}
}
int main(int argc, char *argv[]) {
printf("%s\n", UA);
printf("Copyright (C) 2010 Luca Cervasio\n");
printf("Created by Luca Cervasio <[email protected]>\n");
printf("All rights reserved (C) 2010 Luca Cervasio\n");
printf("You're not allowed to copy or redistribute this\n");
printf("software without the author's consent\n");
printf("==================================================\n");
printf("Simulator is running with %d CPEs and %d maximum parallel connections\n", NUM_CPE, NUM_PARALLEL_CPES);
int debug;
int port;
char* ip;
int num_cpe;
int index;
int c;
while ((c = getopt(argc, argv, "n:i:p:d")) != -1)
switch (c) {
case 'n':
num_cpe = optarg;
break;
case 'i':
ip = optarg;
break;
case 'p':
port = optarg;
break;
case 'd':
debug = 1;
break;
case '?':
if (optopt == 'c')
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
else if (isprint(optopt))
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
else
fprintf(stderr,
"Unknown option character `\\x%x'.\n",
optopt);
return 1;
default:
abort();
}
for (index = optind; index < argc; index++)
printf("Non-option argument %s\n", argv[index]);
q = queue_init();
queue_limit(q, 2500); // setting queue depth
int r1, r2;
pthread_t *thread1 = malloc(sizeof (pthread_t));
pthread_t *thread2 = malloc(sizeof (pthread_t));
long msg1, msg2;
// instanzio gli oggetti CPEs
crea_cpes();
// avvio il listener
r1 = pthread_create(thread1, NULL, listener, (void *) msg1);
// avvio il connection manager (il ciclo della select)
r2 = pthread_create(thread2, NULL, connection_selecter, (void *) msg2);
// avvio lo scheduler
scheduler();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment