-
-
Save pfpmeijers/e35e1bd74fbef2d165c9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <time.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#ifdef _WIN32 | |
#include <windows.h> | |
#endif | |
#define MIN(a, b) (((a) < (b)) ? (a) : (b)) | |
#define MAX(a, b) (((a) > (b)) ? (a) : (b)) | |
// Select between supported communication frameworks: | |
#ifndef OPC | |
#define EPICS | |
#endif | |
// Include communication framework header. | |
#if defined(OPC) | |
#include "open62541.h" | |
#elif defined(EPICS) | |
#include "cadef.h" | |
#endif | |
// Framework related globals. | |
#if defined(OPC) | |
static UA_ReadRequest req; | |
static UA_Client* client; | |
#elif defined(EPICS) | |
#define CA_TIMEOUT_TIME (20.0) // seconds timeout for EPICS | |
#define MAX_T_DISCONN (10) // total accepted disconnects | |
#define MAX_CONN_RETRIES (20) // retries after 1 disconnect (a 1 second) | |
#define MAX_BENCH_ARRAY (1<<20) // max size of benchmark array in longs of 4 bytes | |
#define MAX_BENCH_CHANNELS (1<<20) // max number of benchmark channels | |
typedef struct { | |
char * pvname; | |
chid chid; | |
} channel_struct; | |
static channel_struct channels [MAX_BENCH_CHANNELS]; | |
#endif | |
// Define parameters/defaults. | |
static char* variable_name = NULL; | |
static int array_size = 1; | |
static int array_item_size = 4; // int32 assumed. Not configurable yet. | |
static int transaction_size = 1; | |
static int transaction_count = 1000; | |
static float runout_duration = 0.0; | |
static int verify = 0; | |
static int verbose = 0; | |
static int sync_begin = 0; | |
static int sync_end = 0; | |
#if defined(OPC) | |
static char* protocol = "opc.tcp://"; | |
static char* server_address = "localhost"; | |
static char* port_nr = "48010"; | |
static int namespace_index = 1; | |
#endif | |
const char* measure_trigger = "measure"; | |
//--------------------------------------------------------------------------------------------------------------------- | |
// Argument parsing and usage. | |
// | |
static void print_usage(char** argv, char *title) | |
{ | |
if(title) | |
{ printf ("%s\n", title); | |
} | |
printf("usage: %s\n" | |
#if defined(OPC) | |
" [-server server_address ] (default: %s)\n" | |
" [-port port_nr ] (default: %s)\n" | |
" [-ns namespace_index ] (default: %d)\n" | |
#endif | |
" [-var variable_name ] (overrules name constructed from array size)\n" | |
" [-size array_size ] (default: %d, unit: # int32 items, to construct variable name)\n" | |
" [-vars transaction_size] (default: %d, unit: # variables)\n" | |
" [-repeat nr_transactions ] (default: %d)\n" | |
" [-runout duration ] (default: %g)\n" | |
" [-verify ] (verify results)\n" | |
" [-sync_begin ] (wait on '%s' trigger file to sync)\n" | |
" [-sync_end ] (continue as long as '%s' trigger file present)\n" | |
" [-v ] (verbose)\n" | |
" [-h ] (help)\n", | |
argv[0], | |
#if defined(OPC) | |
server_address, port_nr, namespace_index, | |
#endif | |
array_size, transaction_size, transaction_count, runout_duration, | |
measure_trigger, measure_trigger | |
); | |
exit(-1); | |
} | |
static void parse_arguments(int argc, char** argv) | |
{ | |
// Parse arguments. | |
int i; | |
for(i = 1; i < argc; i++) | |
{ if(strcmp(argv[i], "-h") == 0) | |
{ print_usage(argv, NULL); | |
} | |
else if(strcmp(argv[i], "-v") == 0) | |
{ verbose = 1; | |
} | |
else if(strcmp(argv[i], "-verify") == 0) | |
{ verify = 1; | |
} | |
else if(strcmp(argv[i], "-sync_begin") == 0) | |
{ sync_begin = 1; | |
} | |
else if(strcmp(argv[i], "-sync_end") == 0) | |
{ sync_end = 1; | |
} | |
else if(i < argc - 1) | |
{ | |
#if defined(OPC) | |
if(strcmp(argv[i], "-server") == 0) | |
{ server_address = argv[i + 1]; | |
} | |
else if(strcmp(argv[i], "-port") == 0) | |
{ port_nr = argv[i + 1]; | |
} | |
else if(strcmp(argv[i], "-ns") == 0) | |
{ namespace_index = atoi(argv[i + 1]); | |
} | |
else | |
#endif | |
if(strcmp(argv[i], "-var") == 0) | |
{ variable_name = argv[i + 1]; | |
} | |
else if(strcmp(argv[i], "-size") == 0) | |
{ array_size = atoi(argv[i + 1]); | |
} | |
else if(strcmp(argv[i], "-vars") == 0) | |
{ transaction_size = atoi(argv[i + 1]); | |
} | |
else if(strcmp(argv[i], "-repeat") == 0) | |
{ transaction_count = atoi(argv[i + 1]); | |
} | |
else if(strcmp(argv[i], "-runout") == 0) | |
{ runout_duration = atof(argv[i + 1]); | |
} | |
} | |
} | |
if(variable_name == NULL) | |
{ variable_name = malloc(128); | |
#if defined(OPC) | |
sprintf(variable_name, "var%d", array_size); | |
#elif defined(EPICS) | |
sprintf(variable_name, "VAR%d.VAL", array_size); | |
#endif | |
} | |
else | |
{ // Make a string copy, such that it can be freed during cleanup. Needed to align with the case above. | |
char* s = malloc(strlen(variable_name) + 1); | |
strcpy(s, variable_name); | |
variable_name = s; | |
} | |
if(verbose) | |
{ printf("%s %s %s %s " | |
#if defined(OPC) | |
"-server %s -port %s -ns %d " | |
#endif | |
"-var %s -size %d -vars %d -repeat %d -runout %g\n", | |
argv[0], verify ? "-verify" : "", sync_begin ? "-sync_begin" : "", sync_end ? "-sync_end" : "", | |
#if defined(OPC) | |
server_address, port_nr, namespace_index, | |
#endif | |
variable_name, array_size, transaction_size, transaction_count, runout_duration); | |
} | |
#if defined(EPICS) | |
{ | |
// array size must be a power of 2 | |
int i = 1; | |
int found = 0; | |
while (i <= MAX_BENCH_ARRAY) { | |
if (array_size == i) { | |
found = 1; | |
} | |
i *= 2; | |
} | |
if (!found) { | |
printf ("EPICS Array size must be a power of 2\n"); exit (1); | |
} | |
} | |
if (array_size > MAX_BENCH_ARRAY) { | |
printf ("EPICS Array size cannot exceed %d\n", MAX_BENCH_ARRAY); exit (1); | |
} | |
if (transaction_size > MAX_BENCH_CHANNELS) { | |
printf ("EPICS Number of channels cannot exceed %d\n", MAX_BENCH_CHANNELS); exit (1); | |
} | |
#endif | |
} | |
//--------------------------------------------------------------------------------------------------------------------- | |
#if defined(EPICS) | |
// reconnect if status indicates disconnect ------------------------------- | |
static int handle_disconnect (int status, channel_struct *pch) | |
{ | |
static int total_disconnects = 0; | |
int tries = 0; | |
if ((status==ECA_DISCONN) || (status == ECA_TIMEOUT)) { | |
total_disconnects++; | |
while ((status != ECA_NORMAL) && | |
(total_disconnects < MAX_T_DISCONN) && | |
(tries++ < MAX_CONN_RETRIES) ) { | |
status = ca_create_channel(pch->pvname, NULL, NULL, 10, &pch->chid); | |
if (status == ECA_NORMAL) { | |
status = ca_pend_io (CA_TIMEOUT_TIME); | |
} | |
} | |
if (status != ECA_NORMAL) { | |
printf ("ERROR: Failed to reconnect after disconnect; pv %s retries = %d; total disconnects = %d\n", | |
pch->pvname, tries, total_disconnects); | |
} | |
exit (1); | |
} | |
return status; | |
} | |
#endif | |
//--------------------------------------------------------------------------------------------------------------------- | |
#if defined(_WIN32) | |
typedef LARGE_INTEGER Clock_t; | |
static void get_clock(Clock_t* c) | |
{ QueryPerformanceCounter(c); | |
} | |
static float get_clock_diff(Clock_t* c1, Clock_t* c2) | |
{ LARGE_INTEGER f; | |
QueryPerformanceFrequency(&f); | |
return (float)(c1->QuadPart - c2->QuadPart) / f.QuadPart; | |
} | |
#elif _POSIX_C_SOURCE >= 199309L | |
typedef struct timespec Clock_t; | |
static void get_clock(Clock_t* c) | |
{ clock_gettime(CLOCK_REALTIME, c); | |
} | |
static float get_clock_diff(Clock_t* c1, Clock_t* c2) | |
{ return (c1->tv_sec - c2->tv_sec) + (c1->tv_nsec - c2->tv_nsec )/1e9; | |
} | |
#else | |
typedef clock_t Clock_t; | |
static void get_clock(Clock_t* c) | |
{ *c = clock(); | |
} | |
static float get_clock_diff(Clock_t* c1, Clock_t* c2) | |
{ return (*c1 - *c2)/(float)CLOCKS_PER_SEC; | |
} | |
#endif | |
//--------------------------------------------------------------------------------------------------------------------- | |
// Execute the read transactions. | |
// | |
void run_transactions(int max_transaction_count, float max_measurement_duration, | |
int sync_begin, int sync_end, int print_output) | |
{ | |
Clock_t measurement_begin_clock; | |
Clock_t measurement_end_clock; | |
int measurement_begin_clocked = 0; | |
int measurement_end_clocked = 0; | |
int measurement_ok = 1; | |
int measuring = 0; | |
float measurement_duration = 0.0; | |
int transaction_count = 0; | |
Clock_t transaction_begin_clock; | |
Clock_t transaction_end_clock; | |
float transaction_duration_max = 0.0; | |
float transaction_duration_min = 1e12; | |
float transaction_duration_sum = 0.0; | |
#if defined(OPC) | |
UA_Int32 value = -1; | |
#elif defined(EPICS) | |
long* pl = (long *)malloc(4*MAX_BENCH_ARRAY); | |
#endif | |
while(!measurement_end_clocked && | |
(max_transaction_count == 0 || transaction_count < max_transaction_count) && | |
(max_measurement_duration == 0.0 || measurement_duration < max_measurement_duration)) | |
{ | |
if(verbose) | |
{ if(measuring && !verify) | |
{ printf("%d \r", transaction_count); | |
} | |
else if(!measuring) | |
{ printf("Waiting ...\r"); | |
} | |
} | |
get_clock(&transaction_begin_clock); | |
#if defined(OPC) | |
// Do the read transaction. | |
UA_ReadResponse resp = UA_Client_read(client, &req); | |
get_clock(&transaction_end_clock); | |
if(resp.responseHeader.serviceResult != UA_STATUSCODE_GOOD) | |
{ measurement_ok = 0; | |
printf("ERROR: Read failed. Service result is not good: 0x%X\n", resp.responseHeader.serviceResult); | |
break; | |
} | |
if(resp.resultsSize != transaction_size) | |
{ measurement_ok = 0; | |
printf("ERROR: Read failed. Result size is: %d, instead of: %d\n", resp.resultsSize, transaction_size); | |
break; | |
} | |
if(!resp.results[0].hasValue) | |
{ measurement_ok = 0; | |
printf("ERROR: Read failed. Result has no value.\n"); | |
break; | |
} | |
if(resp.results[0].value.type != &UA_TYPES[UA_TYPES_INT32]) | |
{ measurement_ok = 0; | |
printf("ERROR: Read failed. Result has wrong type: %d\n", | |
resp.results[0].value.type->typeId.identifier.numeric); | |
break; | |
} | |
if(resp.resultsSize != transaction_size) | |
{ measurement_ok = 0; | |
printf("ERROR: Read failed. Wrong number of results received: %d\n", resp.resultsSize); | |
break; | |
} | |
UA_Variant* variant = &resp.results[0].value; | |
int variant_length = UA_Variant_isScalar(variant) ? 1 : variant->arrayLength; | |
if(variant_length != array_size) | |
{ measurement_ok = 0; | |
printf("ERROR: Read failed. Wrong array size: %d\n", variant_length); | |
break; | |
} | |
value = 0; | |
if(verify && measuring) | |
{ // Retrieve the individual values. | |
int j; | |
for(j = 0; j < resp.resultsSize; j++) | |
{ if(resp.results[j].hasValue && resp.results[j].value.type == &UA_TYPES[UA_TYPES_INT32]) | |
{ UA_Variant* variant = &resp.results[j].value; | |
int variant_length = UA_Variant_isScalar(variant) ? 1 : variant->arrayLength; | |
int k; | |
for(k = 0; k < variant_length; k++) | |
{ value = ((UA_Int32*)variant->data)[k]; | |
if(verbose) | |
{ printf("%d ", value); | |
} | |
} | |
if(verbose) | |
{ printf("\r"); | |
} | |
} | |
} | |
} | |
// Cleanup. | |
UA_ReadResponse_deleteMembers(&resp); | |
#elif defined(EPICS) | |
int retries = 0; | |
int status = ECA_NORMAL; | |
int j = 0; | |
// Loop through all channels per transaction | |
for (j = 0; j < transaction_size && status == ECA_NORMAL; j++) { | |
channel_struct *pch = &channels [j]; | |
int ec = 0; | |
while ((ec == 0) && (retries++ < MAX_CONN_RETRIES)) { | |
ec = ca_element_count (pch -> chid); | |
if (ec == 0) handle_disconnect (ECA_DISCONN, pch); | |
} | |
status = ca_array_get(DBR_LONG, array_size, pch -> chid, (void *) pl); | |
} | |
SEVCHK (status, "ca_array_get failure in loop"); | |
status = ca_pend_io(CA_TIMEOUT_TIME); | |
get_clock(&transaction_end_clock); | |
SEVCHK(status, "ca_pend_io failure in loop"); | |
if (status != ECA_NORMAL) { | |
printf ("Premature exit after %d reads\n", transaction_count); | |
break; | |
} | |
#endif | |
float transaction_duration = get_clock_diff(&transaction_end_clock, &transaction_begin_clock); | |
transaction_duration_min = MIN(transaction_duration, transaction_duration_min); | |
transaction_duration_max = MAX(transaction_duration, transaction_duration_max); | |
transaction_duration_sum += transaction_duration; | |
// Wait with time measurement until trigger file present, that acts as synchronization between multiple clients. | |
if(!measurement_begin_clocked && !measuring && (!sync_begin || access(measure_trigger, F_OK) != -1)) | |
{ get_clock(&measurement_begin_clock); | |
measurement_begin_clocked = 1; | |
measuring = 1; | |
} | |
// End the measurement when trigger file removed, or when max duration reached (used for run-out). | |
// Thus determine the duration of the measurement so far. | |
else if(!measurement_end_clocked && measuring) | |
{ get_clock(&measurement_end_clock); | |
measurement_duration = get_clock_diff(&measurement_end_clock, &measurement_begin_clock); | |
if(sync_end && access(measure_trigger, F_OK) == -1) | |
{ measurement_end_clocked = 1; | |
measuring = 0; | |
} | |
} | |
transaction_count += measuring; | |
} | |
int bytes_per_transaction = array_size * array_item_size * transaction_size; | |
float bandwidth = (float)bytes_per_transaction * transaction_count / measurement_duration; | |
if(measurement_ok && print_output) | |
{ printf("%d transactions, %g seconds, %d B/transaction, %g MB/s", | |
transaction_count, measurement_duration, bytes_per_transaction, bandwidth * 1e-6); | |
if(transaction_duration_sum > 0.0) | |
{ printf(", latency min/avg/max: %g/%g/%g seconds", | |
transaction_duration_min, transaction_duration_sum / transaction_count, transaction_duration_max); | |
} | |
printf("\n"); | |
} | |
} | |
//--------------------------------------------------------------------------------------------------------------------- | |
#if defined(EPICS) | |
// create a variable name from a sequence number and a type, and return it in 'the_name' | |
static void makename (int size, char **the_name) | |
{ | |
char dest [100]; | |
sprintf (dest, "benchlongarray:%d.VAL", size); | |
*the_name = (char *) malloc (strlen (dest) + 1); | |
if (*the_name == NULL) { | |
printf ("ERROR: Failed to allocate %d bytes of memory for variable name %s\n", (int) (strlen (dest)) + 1, dest); | |
exit (1); | |
} | |
strcpy (*the_name, dest); | |
} | |
// Create all EPICS channels --------------------------------------- | |
// Depending on array size (=1 or >1) create an array of scalar channels or an array of array channels | |
static void make_channels (void) | |
{ | |
int i; | |
int status; | |
channel_struct *pcs = &channels [0]; | |
for (i = 0; i < transaction_size; i++) { | |
pcs = &channels [i]; | |
makename (array_size, &(pcs -> pvname)); | |
status = ca_create_channel (pcs->pvname, NULL, NULL, 10, &pcs->chid); | |
if (status != ECA_NORMAL) { | |
printf ("make_channels: ca_create_channel failure, channel %d\n", i); | |
} | |
SEVCHK(status,"ca_create_channel failure"); | |
} | |
status = ca_pend_io(CA_TIMEOUT_TIME); | |
if (status != ECA_NORMAL) { | |
printf ("ca_pend_io failure creating channel %s; offline?\n", pcs -> pvname); | |
} | |
SEVCHK (status, "ca_pend_io failure creating channel"); | |
} | |
// Destroy all created EPICS channels --------------------------------------- | |
static void destroy_channels (void) | |
{ | |
int i; | |
int status; | |
for (i = 0; i < transaction_size; i++) { | |
channel_struct *pcs = &channels [i]; | |
status = ca_clear_channel(pcs->chid); | |
if (status != ECA_NORMAL) { | |
printf ("ca_clear_channel failure, channel %s\n", pcs -> pvname); | |
} | |
SEVCHK(status,"ca_clear_channel failure"); | |
status = ca_pend_io(CA_TIMEOUT_TIME); | |
if (status != ECA_NORMAL) { | |
printf ("ca_pend_io failure destroying channels, channel %s\n", pcs -> pvname); | |
} | |
SEVCHK (status, "ca_pend_io failure destroying channels"); | |
free (pcs -> pvname); | |
} | |
} | |
#endif | |
//--------------------------------------------------------------------------------------------------------------------- | |
// Main app. | |
// | |
int main(int argc, char** argv) | |
{ | |
// Handle command line arguments. | |
parse_arguments(argc, argv); | |
// Setup the connnection. | |
#if defined(OPC) | |
char* server_url = malloc(strlen(protocol) + strlen(server_address) + 1 + strlen(port_nr) + 1); | |
server_url[0] = 0; | |
strcat(server_url, protocol); | |
strcat(server_url, server_address); | |
strcat(server_url, ":"); | |
strcat(server_url, port_nr); | |
// open65241 stack does not support message chunking yet. | |
// Hence single buffer to be specified that is big enough to contain all transaction data, including overhead. | |
int buffer_overhead = 1024; // Extra space for non-payload. | |
int buffer_size = array_size * sizeof(UA_Int32) * transaction_size + buffer_overhead; | |
if(buffer_size < 65536) | |
{ buffer_size = 65536; | |
} | |
UA_ClientConfig config = { | |
.timeout = 5, // sync response timeout in ms | |
.secureChannelLifeTime = 1000000, // lifetime in ms (then the channel needs to be renewed) | |
.timeToRenewSecureChannel = 2000, // time in ms before expiration to renew the secure channel | |
{.protocolVersion = 0, .sendBufferSize = buffer_size, .recvBufferSize = buffer_size, | |
.maxMessageSize = buffer_size, .maxChunkCount = 1 | |
} | |
}; | |
client = UA_Client_new(config /*UA_ClientConfig_standard*/, Logger_Stdout_new()); | |
UA_StatusCode retval = UA_Client_connect(client, ClientNetworkLayerTCP_connect, server_url); | |
if(retval != UA_STATUSCODE_GOOD) | |
{ printf("Aborted.\n"); | |
return retval; | |
} | |
UA_ReadRequest_init(&req); | |
req.nodesToReadSize = transaction_size; | |
req.nodesToRead = UA_Array_new(&UA_TYPES[UA_TYPES_READVALUEID], req.nodesToReadSize); | |
int i; | |
for(i = 0; i < req.nodesToReadSize; i++) | |
{ UA_ReadValueId_init(&(req.nodesToRead[i])); | |
UA_NodeId_init(&(req.nodesToRead[i].nodeId)); | |
req.nodesToRead[i].nodeId = UA_NODEID_STRING_ALLOC(namespace_index, variable_name); // nodeId string deleted with req | |
req.nodesToRead[i].attributeId = UA_ATTRIBUTEID_VALUE; | |
} | |
#elif defined(EPICS) | |
int status; | |
make_channels (); | |
status = ca_context_create(ca_disable_preemptive_callback); | |
SEVCHK(status,"ca_context_create failure"); | |
#endif | |
// Run the transactions. | |
run_transactions(transaction_count, 0.0, sync_begin, sync_end, 1); | |
if(runout_duration > 0.0) | |
{ if(verbose) | |
{ printf("Run-out ...\n"); | |
} | |
run_transactions(0, runout_duration, 0, 0, 0); | |
} | |
// Cleanup. | |
#if defined(OPC) | |
UA_ReadRequest_deleteMembers(&req); | |
UA_Client_disconnect(client); | |
UA_Client_delete(client); | |
free(server_url); | |
#elif defined (EPICS) | |
destroy_channels (); | |
// SEVCHK(ca_context_destroy(),"ca_context_destroy"); | |
SEVCHK(ca_task_exit(),"ca_task_exit failure"); | |
#endif | |
free(variable_name); | |
return 0; | |
} | |
//--------------------------------------------------------------------------------------------------------------------- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gcc -O3 -DOPC -D_WIN32 -std=c11 client.c open62541.c -lws2_32 -o client.exe | |
gcc -O3 -std=c11 server.c open62541.c -lws2_32 -o server.exe |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
gcc -O3 -DOPC -D_POSIX_C_SOURCE=199309L -std=c11 client.c open62541.c -o client.bin | |
gcc -O3 -std=c11 server.c open62541.c -o server.bin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import subprocess | |
import threading | |
import time | |
from datetime import datetime | |
client_executable = "client" | |
server_address = "localhost" | |
port_nr = "48010" | |
namespace_index = 1 | |
variable_name = None | |
client_count = 1 | |
array_size = 10 | |
transaction_size = 1 | |
max_transaction_count = 1000000 | |
max_measure_duration = 1 # seconds | |
runout_duration = 0.5 # seconds | |
out_csv = "measurements.csv" | |
max_retries = 2 | |
verify = False | |
sync_begin = True | |
sync_end = True | |
verbose = False | |
args = sys.argv | |
for i in range(1, len(args)): | |
option = args[i] | |
if option == "-v": | |
verbose = True | |
elif option == "-verify": | |
verify = True | |
elif i + 1 < len(args): | |
value = args[i + 1] | |
if option == "-client": | |
client_executable = value | |
elif option == "-server": | |
server_address = value | |
elif option == "-port": | |
port_nr = value | |
elif option == "-ns": | |
namespace_index = int(value) | |
elif option == "-clients": | |
client_count = int(value) | |
elif option == "-var": | |
variable_name = value | |
elif option == "-size": | |
array_size = int(value) | |
elif option == "-vars": | |
transaction_size = int(value) | |
elif option == "-repeat": | |
max_transaction_count = int(value) | |
elif option == "-duration": | |
max_measure_duration = float(value) | |
elif option == "-runout": | |
runout_duration = float(value) | |
elif option == "-csv": | |
out_csv = value | |
args = \ | |
client_executable + \ | |
" -server " + server_address + \ | |
" -port " + port_nr + \ | |
" -ns " + str(namespace_index) + \ | |
" -clients " + str(client_count) + \ | |
" -size " + str(array_size) + \ | |
" -vars " + str(transaction_size) + \ | |
" -repeat " + str(max_transaction_count) + \ | |
" -runout " + str(runout_duration) | |
if variable_name is not None: | |
args += " -var " + variable_name | |
if sync_begin: | |
args += " -sync_begin" | |
if sync_end: | |
args += " -sync_end" | |
if verify: | |
args += " -verify" | |
if verbose: | |
print(args) | |
def run(*args, **kwargs): | |
subprocess.call(args, **kwargs) | |
def start_client(args, client_idx, out_file): | |
thread = threading.Thread(target = run, args = tuple(args.split()), kwargs = {"stdout": out_file}) | |
thread.start() | |
if verbose: | |
print("Client " + str(client_idx) + " started.") | |
return thread | |
measuring = False | |
measurement_done = False | |
retries = 0 | |
while not measurement_done: | |
measure_trigger = "measure" | |
if os.path.exists(measure_trigger): | |
os.remove(measure_trigger) | |
clients = [] | |
for i in range(client_count): | |
if not os.path.exists("logs"): | |
os.mkdir("logs") | |
log_name = "logs/" + str(i) + ".log" | |
log_file = open(log_name, "w") | |
clients += [(start_client(args, i, log_file), i, log_name, log_file)] | |
time.sleep(0.1 * client_count) | |
open(measure_trigger, "w").close() | |
measuring = True | |
if verbose: | |
print("Measuring started.") | |
begin_time = datetime.now() | |
first_client_duration = None | |
transaction_count_sum = 0 | |
transaction_count_min = 1000000000 | |
transaction_count_max = 0 | |
transaction_duration_min = 1e12 | |
transaction_duration_sum = 0 | |
transaction_duration_max = 0 | |
measurement_duration_sum = 0 | |
measurement_duration_min = 1e12 | |
measurement_duration_max = 0 | |
bandwidth_sum = 0 | |
bandwidth_min = 1e12 | |
bandwidth_max = 0.0 | |
measurement_ok = True | |
while clients: | |
for client in clients: | |
thread, i, log_name, log_file = client | |
if not thread.is_alive(): | |
if first_client_duration is None: | |
first_client_duration = (datetime.now() - begin_time).total_seconds() | |
log_file.close() | |
log_file = open(log_name, "r") | |
lines = log_file.readlines() | |
line = lines[0].strip() | |
line_items =line.split() | |
try: | |
transaction_count, measurement_duration, transaction_duration_items = \ | |
int(line_items[0]), float(line_items[2]), line_items[10].split("/") | |
except: | |
transaction_count = measurement_duration = 0 | |
transaction_duration_items = [0, 0, 0] | |
if transaction_count == 0 or measurement_duration == 0: | |
print("ERROR: Client " + str(i) + " transactions failed: " + line) | |
measurement_ok = False | |
array_item_size = 4 # Int32 | |
if measurement_duration > 0: | |
bandwidth = array_size * array_item_size * transaction_size * transaction_count / measurement_duration | |
else: | |
bandwidth = 0 | |
transaction_duration_sum += float(transaction_duration_items[1]) | |
transaction_duration_min = min(transaction_duration_min, float(transaction_duration_items[0])) | |
transaction_duration_max = max(transaction_duration_max, float(transaction_duration_items[2])) | |
measurement_duration_sum += measurement_duration | |
measurement_duration_min = min(measurement_duration_min, measurement_duration) | |
measurement_duration_max = max(measurement_duration_max, measurement_duration) | |
transaction_count_sum += transaction_count | |
transaction_count_min = min(transaction_count_min, transaction_count) | |
transaction_count_max = max(transaction_count_max, transaction_count) | |
bandwidth_sum += bandwidth | |
bandwidth_min = min(bandwidth_min, bandwidth) | |
bandwidth_max = max(bandwidth_max, bandwidth) | |
if verbose: | |
print("Client " + str(i) + " done: " + line) | |
clients.remove(client) | |
break | |
if sync_end and measuring and (datetime.now() - begin_time).total_seconds() > max_measure_duration: | |
while True: | |
# noinspection PyBroadException | |
try: | |
os.remove(measure_trigger) | |
measuring = False | |
break | |
except: | |
assert((datetime.now() - begin_time).total_seconds() < max_measure_duration + 2) | |
if verbose: | |
print("Measuring done. Runout ...") | |
time.sleep(0.01) | |
if verbose: | |
print("All done.") | |
if client_count > 1 and measurement_duration_max > first_client_duration: | |
print("ERROR: Run-out too short. " | |
"Max measurement duration: " + str(measurement_duration_max) + " sec, " | |
"First client duration: " + str(first_client_duration) + " sec") | |
measurement_ok = False | |
if measurement_ok: | |
measurement_done = True | |
else: | |
if retries == max_retries: | |
break | |
print("Retrying ...") | |
retries += 1 | |
runout_duration *= 2 | |
transaction_bytes = array_size * array_item_size * transaction_size | |
transaction_count_avg = transaction_count_sum / client_count | |
transaction_duration_avg = transaction_duration_sum / client_count | |
measurement_duration_avg = measurement_duration_sum / client_count | |
bandwidth_avg = bandwidth_sum / client_count | |
if measurement_duration_max > 0: | |
bandwidth_sum_alt1 = client_count * transaction_bytes * transaction_count_avg / measurement_duration_avg | |
bandwidth_sum_alt2 = client_count * transaction_bytes * transaction_count_avg / measurement_duration_max | |
bandwidth_sum_alt3 = transaction_bytes * transaction_count_sum / measurement_duration_max | |
if not measurement_ok or measurement_duration_max == 0: | |
bandwidth_sum = 0 | |
bandwidth_sum_alt1 = 0 | |
bandwidth_sum_alt2 = 0 | |
bandwidth_sum_alt3 = 0 | |
out_keys = ["client_executable", "client_count", "array_size", "transaction_size", "transaction_bytes", | |
"measurement_duration_min", "measurement_duration_avg", "measurement_duration_max", | |
"transaction_count_min", "transaction_count_avg", "transaction_count_max", | |
"transaction_duration_min", "transaction_duration_avg", "transaction_duration_max", | |
"bandwidth_min", "bandwidth_avg", "bandwidth_max", | |
"bandwidth_sum", "bandwidth_sum_alt1", "bandwidth_sum_alt2", "bandwidth_sum_alt3"] | |
out_data = {key: globals()[key] for key in out_keys} | |
s = "" | |
for key in out_keys: | |
s += key + ": " + str(out_data[key]) + ", " | |
print(s) | |
if transaction_count > 0: | |
csv = open(out_csv, "a") | |
for key in out_keys: | |
csv.write(str(out_data[key]) + ",") | |
csv.write("\n") | |
csv.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
for client in ["./opcClient", "./caClient"]: | |
for client_count in [1 << i for i in range(8)]: | |
for transaction_size in [1 << i for i in range(10)]: | |
for array_size in [1 << i for i in range(20)]: | |
if client == "./opcClient" and (array_size * transaction_size * 4 > 65536 or transaction_size >= 64): | |
print("Skipping: ", array_size, transaction_size) | |
continue | |
os.system("python multi_client.py" | |
" -client " + client + | |
" -server 192.168.0.100" + | |
" -clients " + str(client_count) + | |
" -size " + str(array_size) + | |
" -vars " + str(transaction_size) + | |
"") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <string.h> | |
#include <stdlib.h> | |
#include <signal.h> | |
#include "open62541.h" | |
// Application arguments/defaults | |
static int port_nr = 48010; | |
static int thread_count = 4; | |
static int verbose = 0; | |
// Argument parsing and usage. | |
void print_usage(char *title) | |
{ | |
if(title) | |
{ printf("%s\n", title); | |
} | |
printf( | |
"usage: \n" | |
" [-port port_nr ] (default: %d)\n" | |
" [-threads thread_count] (default: %d)\n" | |
" [-v ] (verbose)\n" | |
" [-h ] (help)\n", | |
port_nr, thread_count | |
); | |
exit(-1); | |
} | |
void parse_arguments(int argc, char** argv) | |
{ | |
for(int i = 1; i < argc; i++) | |
{ if(strcmp(argv[i], "-h") == 0) | |
{ print_usage(NULL); | |
} | |
else if(strcmp(argv[i], "-v") == 0) | |
{ verbose = 1; | |
} | |
else if(i < argc - 1) | |
{ if(strcmp(argv[1], "-port") == 0) | |
{ port_nr = atoi(argv[i+1]); | |
} | |
else if(strcmp(argv[i], "-threads") == 0) | |
{ thread_count = atoi(argv[i+1]); | |
} | |
} | |
} | |
if(verbose) | |
{ printf("%s -port %d -threads %d\n", argv[0], port_nr, thread_count); | |
} | |
} | |
// Define arrays to provide data to the variable nodes. | |
typedef struct | |
{ int size; | |
char* name; | |
UA_Int32* data; | |
} | |
Variable_t; | |
static int variable_sizes[] = | |
{1, 2, 4, 5, 8, 10, 13, 16, 20, 32, 50, 64, 100, 128, 200, 256, 500, 512, 1000, 1024, | |
2000, 2048, 4096, 5000, 8192, 10000, 16384, 20000, 25000, 32768, 50000, 65536, | |
100000, 131072, 200000, 262144, 500000, 524288, 1000000, 1048576, 2097152}; | |
static int max_variable_size; | |
static int variable_count; | |
static Variable_t** variables; | |
void setup_variables() | |
{ | |
variable_count = sizeof(variable_sizes)/sizeof(int); | |
variables = malloc(variable_count * sizeof(Variable_t)); | |
if(variables == NULL) | |
{ printf("ERROR: Memory allocation of variables data failed.\n"); | |
exit(-1); | |
} | |
for(int i = 0; i < variable_count; i++) | |
{ int size = variable_sizes[i]; | |
max_variable_size = size > max_variable_size ? size : max_variable_size; | |
variables[i] = malloc(sizeof(Variable_t)); | |
variables[i]->size = size; | |
variables[i]->data = malloc(size * sizeof(UA_Int32)); | |
for(int j = 0; j < size; j++) | |
{ variables[i]->data[j] = j; | |
} | |
variables[i]->name = malloc(5 + i); | |
sprintf(variables[i]->name, "var%d", size); | |
} | |
} | |
void cleanup_variables() | |
{ for(int i = 0; i < variable_count; i++) | |
{ free(variables[i]->data); | |
free(variables[i]->name); | |
free(variables[i]); | |
} | |
free(variables); | |
} | |
// DataSource callbacks | |
static UA_StatusCode read(void *handle, UA_Boolean includeSourceTimeStamp, const UA_NumericRange *range, UA_DataValue *dataValue) | |
{ | |
Variable_t* variable = handle; | |
if(variable->size == 1) | |
{ UA_Variant_setScalar(&dataValue->value, variable->data, &UA_TYPES[UA_TYPES_INT32]); | |
} | |
else | |
{ UA_Variant_setArray(&dataValue->value, variable->data, variable->size, &UA_TYPES[UA_TYPES_INT32]); | |
} | |
dataValue->status = UA_STATUSCODE_GOOD; | |
dataValue->hasValue = UA_TRUE; | |
dataValue->hasStatus = UA_TRUE; | |
if(includeSourceTimeStamp) | |
{ dataValue->hasSourceTimestamp = UA_TRUE; | |
dataValue->sourceTimestamp = UA_DateTime_now(); | |
} | |
if(verbose) | |
{ printf("%s: %d", variable->name, variable->data[variable->size - 1]); | |
if(dataValue->hasSourceTimestamp) | |
{ UA_ByteString s; | |
UA_DateTime_toString(dataValue->sourceTimestamp, &s); | |
printf(" %s", s.data); | |
UA_ByteString_deleteMembers(&s); | |
} | |
printf(" \r"); | |
} | |
// Increase last array element, in order to give feedback to the client that the data is really updated. | |
variable->data[variable->size - 1]++; | |
return UA_STATUSCODE_GOOD; | |
} | |
static void release(void *handle, UA_DataValue *dataValue) | |
{ | |
if(dataValue->hasValue) | |
{ // UA_Variant_deleteMembers(&dataValue->value); | |
// Not to be deleted because data is not copied into the variant. Just referenced. | |
} | |
} | |
static UA_StatusCode write(void *handle, const UA_Variant *data, const UA_NumericRange *range) | |
{ | |
} | |
// Callback to handle ctrl-c in order to stop the server with cleanup. | |
static UA_Boolean running = UA_TRUE; | |
static void stop_handler(int sign) | |
{ running = UA_FALSE; | |
} | |
// Main app. | |
int main(int argc, char** argv) | |
{ | |
// Handle command line arguments.main.c | |
parse_arguments(argc, argv); | |
// Setup variable node data. | |
setup_variables(); | |
// Initialize server. | |
int buffer_overhead = 1024; // Extra space for non-payload. | |
int buffer_size = max_variable_size + buffer_overhead; | |
UA_ConnectionConfig config = | |
{.protocolVersion = 0, .sendBufferSize = buffer_size, .recvBufferSize = buffer_size, | |
.maxMessageSize = buffer_size, .maxChunkCount = 1}; | |
UA_Server *server = UA_Server_new(UA_ServerConfig_standard); | |
UA_Server_addNetworkLayer(server, ServerNetworkLayerTCP_new(config, port_nr)); | |
// Add the variable nodes. | |
for(int i = 0; i < variable_count; i++) | |
{ printf("Variable: %s\n", variables[i]->name); | |
UA_QualifiedName nodeName = UA_QUALIFIEDNAME(1, variables[i]->name); | |
UA_NodeId nodeId = UA_NODEID_STRING(1, variables[i]->name); | |
UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER); | |
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES); | |
UA_DataSource dataSource = {variables[i], read, release, write}; | |
UA_Server_addDataSourceVariableNode(server, dataSource, nodeName, nodeId, parentNodeId, parentReferenceNodeId); | |
} | |
// Run the server loop. | |
printf("Running ...\n"); | |
signal(SIGINT, stop_handler); /* Catches ctrl-c */ | |
UA_StatusCode retval = UA_Server_run(server, thread_count, &running); | |
// Cleanup. | |
UA_Server_delete(server); | |
cleanup_variables(); | |
printf("\rDone.\n"); | |
return retval; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment