Skip to content

Instantly share code, notes, and snippets.

@nad2000
Created July 8, 2015 02:52
Show Gist options
  • Save nad2000/bda2330205bd5421c301 to your computer and use it in GitHub Desktop.
Save nad2000/bda2330205bd5421c301 to your computer and use it in GitHub Desktop.
quit_handler example in C (signal handling)
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <inttypes.h>
#include <time.h>
#include <signal.h>
#include "eda_common.h"
#include "analytics_db.h"
#define TEST_COUNT 1000000
#define TEST_DATA_SAMPLES 1000
int g_quit = 0;
static uint32_t burst_bytes[TEST_DATA_SAMPLES], burst_pkts[TEST_DATA_SAMPLES];
static void quit_handler(int signo)
{
g_quit++;
}
#if 0
static uint64_t timestamp_us(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return ((uint64_t)tv.tv_sec * 1000000) + tv.tv_usec;
}
#endif
static int random_int(int min, int max)
{
int range = max-min+1;
return min + (drand48() * range);
}
static void gen_data(void)
{
int i;
srand48(time(NULL));
for (i = 0; i < TEST_DATA_SAMPLES; i++) {
burst_pkts[i] = random_int(0, 1000);
burst_bytes[i] = random_int(0, 1000000);
}
}
int main(int argc, char **argv)
{
uint32_t retention_sec;
uint64_t start_ts, end_ts, duration;
uint32_t ts = 0;
uint64_t bytes = 0, pkts = 0;
analytics_db_conn_t *conn = NULL;
analytics_db_ins_t *ins_h = NULL;
analytics_db_qry_t *qry_h = NULL;
// specify DB with the environment variable PGDATABASE=...
const char *conn_info = "connect_timeout=10 application_name=analytics_db_test";
const char *table_name = getenv("TABLE_NAME");
printf("\n\nTABLE_NAME: %s\n", table_name);
printf("RETENTION_SEC: %s\n", getenv("RETENTION_SEC"));
if (getenv("RETENTION_SEC") != NULL) {
sscanf(getenv("RETENTION_SEC"), "%d", &retention_sec);
}
// these are for insert
const char *col_names[] = {
"timestamp", "packet_count", "byte_count", "burst_byte_rate", "burst_packet_rate"
};
analytics_db_dtype_t col_types[] = {
ANALYTICS_DB_TYPE_INT32, ANALYTICS_DB_TYPE_INT64, ANALYTICS_DB_TYPE_INT64, ANALYTICS_DB_TYPE_INT32, ANALYTICS_DB_TYPE_INT32
};
// these are for query
analytics_db_dtype_t param_types[] = {ANALYTICS_DB_TYPE_INT32};
uint32_t param_ts = 0;
void *param_value[] = {&param_ts};
if ( !(conn = analytics_db_conn_open(conn_info, ANALYTICS_DB_CONN_WITH_COPY)) ) {
fprintf(stderr,"cannot connect to DB\n");
return -1;
}
printf( "*** PostgreSQL version %i\n", analytics_db_pg_version_num(conn));
printf( analytics_db_pg_version_num(conn) > 90200 ? "*** > 9.2.0 ***\n" : "*** <= 9.2.0 ***\n");
if (table_name != NULL && retention_sec){
if (analytics_db_try_lock_partition(conn, table_name))
printf( "*** Table %s locked successfully\n", table_name);
else printf( "*** Failed to lock %s\n", table_name);
}
if (retention_sec) {
printf("*** Rotation Testing....\n");
analytics_db_rotate(getenv("PGDATABASE"), retention_sec);
}
#if 0
if ( (ins_h = analytics_db_insert_prepare(conn, "bandwidth_stats", 5, col_names, col_types)) ) {
int i, count = 0;
// generate data first
gen_data();
start_ts = timestamp_us();
for (i = 0; i < TEST_COUNT; i++) {
int data_idx = i % TEST_DATA_SAMPLES;
void *values[] = {&ts, &pkts, &bytes, &(burst_bytes[data_idx]), &(burst_pkts[data_idx])};
if (!analytics_db_insert(ins_h, values)) {
count++;
}
ts += 1;
bytes += burst_bytes[data_idx];
pkts += burst_pkts[data_idx];
}
end_ts = timestamp_us();
duration = end_ts - start_ts;
fprintf(stdout,"%u inserts done in %"PRIu64" us\n", count, duration);
fprintf(stdout,"%"PRIu64" ns/insert, %"PRIu64" insert/s\n", (duration * 1000) / count, (1000000 / (duration / count)));
analytics_db_insert_free(ins_h);
}
else {
fprintf(stderr,"cannot get insert handle\n");
}
#else
signal(SIGQUIT, quit_handler);
signal(SIGINT, quit_handler);
signal(SIGTERM, quit_handler);
if ( (qry_h = analytics_db_query_prepare(conn, "SELECT * FROM bandwidth_stats where timestamp > $1", 1, param_types)) ) {
uint32_t last_ts = param_ts;
while (!g_quit) {
int n_rows, n_cols;
if (last_ts > 10) {
param_ts = last_ts - 10;
}
if (!analytics_db_query(qry_h, param_value, &n_rows, &n_cols, 1)) {
int i;
uint32_t prev_ts = 0;
fprintf(stdout,"************************************************\n");
for (i = 0; i < n_rows; i++) {
ts = ntohl( *((uint32_t *)analytics_db_query_res_get(qry_h, i, 0)) );
pkts = be64toh( *((uint64_t *)analytics_db_query_res_get(qry_h, i, 1)) );
bytes = be64toh( *((uint64_t *)analytics_db_query_res_get(qry_h, i, 2)) );
uint32_t mbytes = ntohl( *((uint32_t *)analytics_db_query_res_get(qry_h, i, 3)) );
uint32_t mpkts = ntohl( *((uint32_t *)analytics_db_query_res_get(qry_h, i, 4)) );
fprintf(stdout,"%u: %"PRIu64 ", %"PRIu64 ", %u, %u \n", ts, pkts, bytes, mbytes, mpkts);
if (ts == prev_ts) {
char c;
fprintf(stdout,"duplicate detected!! continue?");
scanf("%c", &c);
}
prev_ts = ts;
last_ts = ts;
}
}
else {
fprintf(stderr,"cannot query\n");
}
sleep(5);
}
analytics_db_query_free(qry_h);
}
else {
fprintf(stderr,"cannot get query handle\n");
}
#endif
analytics_db_conn_close(&conn);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment