Created
July 5, 2022 14:35
-
-
Save rstofi/9ac3e91ae6b8ec58b077abe9c85ed167 to your computer and use it in GitHub Desktop.
Shell script to simulate mock data and perform grid transform tests between teo servers using the gridftp protocol via the globus-url-copy tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#Generate random files for a simple grid transfer tests and performs the given tests | |
#NOTE only generating the data and the last test | |
#(RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST) should be performed | |
# | |
#This test performs transfers with all combinations of the given: | |
# - data sizes (Ns) | |
# - number of threads (Np) | |
# - block sizes (Nbs) | |
# | |
#So in total Ns*Np*Nbs transfer test is performed covering this 3d parameter space | |
# | |
# | |
#There are pre-defined tests covering only single node or default block size slice | |
#of the aformentioned test. | |
#******************************************************************************* | |
#=== Set up what the script should do === | |
DRY_RUN="True" | |
GENERATE_DATA="True" | |
RUN_SIMPLE_GRID_TRANSFER_TEST="False" | |
RUN_PARALLEL_GRID_TRANSFER_TEST="False" | |
RUN_BLOCK_SIZE_GRID_TRANSFER_TEST="False" | |
RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST="True" | |
#******************************************************************************* | |
#=== Set up envinroment === | |
WORKING_DIR="$(pwd)/" | |
TEMP_LOGFILE_NAME="temp_logfile.log" | |
FNAME_BASE="transfer_test_file_size_" | |
STATFILE_FNAME_BASE="transfer_stats_file_size_" | |
SOURCE_DIR="cosmogw.kosmo.physik.uni-muenchen.de/home/rozgonyi/grid_transfer_tests/blob/" | |
DEST_DIR="globus.ilifu.ac.za/idia/users/krozgonyi/grid_transfer_test_results/blob_cosmogw_to_ilifu/" | |
TMP_LOGFILE="${WORKING_DIR}temp_logfile.log" | |
#******************************************************************************* | |
#=== Set timer commands === | |
function start_timer () { | |
echo "Transfer started: $(date '+%Y-%m-%d %H:%M:%S')" >> $1 | |
} | |
function end_timer () { | |
echo "Transfer finished: $(date '+%Y-%m-%d %H:%M:%S')" >> $1 | |
} | |
#******************************************************************************* | |
#=== Functions === | |
function generate_random_file () { | |
#Generate file named $1 with size $2 in byte (!) | |
#openssl rand -out $1 $2 #This does not work for large files for some reson | |
#Generate file named $1 with size $2 in MB (!) | |
#Here the block size (bs) is set to MB ! | |
dd if=/dev/urandom bs=1000000 count="${2}" of=$1 | |
} | |
function GB_to_MB() { | |
#Convert GB to byte | |
#S=$(echo $1*1000000000 | bc) #This is to byte | |
S=$(echo $1*1000 | bc) | |
echo ${S%.*} | |
} | |
function MB_to_byte() { | |
S=$(echo $1*1000000 | bc) | |
echo ${S%.*} | |
} | |
function float_string_to_int() { | |
#This throws an error but I am gonna ignore this for now | |
printf "%.0f" "${1}" | |
} | |
#******************************************************************************* | |
#=== MAIN === | |
#******************************************************************************* | |
#=== Set up test parameter space === | |
declare -a FILE_SIZES=(2.0 4.0 8.0) | |
#declare -a FILE_SIZES=(1.0 4.0 16.0 64.0 256.0 1024.0) | |
declare -a TRANSFER_THREADS=(2 4 8) | |
#declare -a TRANSFER_THREADS=(1 2 4 16 32 64 128 256) | |
declare -a TRANSFER_BLOCK_SIZE=(1.0 10.0 100.0) #Block size in MB | |
#declare -a TRANSFER_BLOCK_SIZE=(1.0 10.0 100.0 1000.0 2000.0 4000.0) | |
#=== Set up basic options for globus-url-copy === | |
BASE_TRANSFER_OPTIONS="-v -c -vb -fast" | |
#******************************************************************************* | |
#=== Generate random data from list | |
if [ $GENERATE_DATA = 'True' ]; then | |
for S in ${FILE_SIZES[@]}; do | |
DATAFILE_NAME="${WORKING_DIR}blob/${FNAME_BASE}${S}_GB.dat" | |
S_IN_MB=$(float_string_to_int $(GB_to_MB $S)) | |
echo "Generate file with size ${S}GB as ${DATAFILE_NAME}..." | |
if [ $DRY_RUN = 'False' ]; then | |
generate_random_file $DATAFILE_NAME $S_IN_MB | |
fi | |
echo "...done" | |
done | |
fi | |
#******************************************************************************* | |
#=== Run simple grid transfer test === | |
if [ $RUN_SIMPLE_GRID_TRANSFER_TEST = 'True' ]; then | |
for S in ${FILE_SIZES[@]}; do | |
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat" | |
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}" | |
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}" | |
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB.dat" | |
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}" | |
echo "Moving file ${DATAFILE_NAME}..." | |
if [ $DRY_RUN = 'False' ]; then | |
#Perform grid test | |
echo "globus-url-copy options: ${BASE_TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE} | |
start_timer $TEMP_LOGFILE | |
#echo "globus-url-copy -v -c -vb gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR}" | |
globus-url-copy ${BASE_TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE} | |
end_timer $TEMP_LOGFILE | |
#Convert logfile to readable format > statfile | |
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH | |
#Move the statfile NOTE that the ststs/ dir should exist | |
mv $STATFILE_PATH "${WORKING_DIR}stats/" | |
#less $TEMP_LOGFILE | |
rm $TEMP_LOGFILE | |
else | |
echo "globus-url-copy options: ${BASE_TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}" | |
echo "start_timer ${TEMP_LOGFILE}" | |
echo "globus-url-copy ${BASE_TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}" | |
echo "end_timer ${TEMP_LOGFILE}" | |
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}" | |
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\"" | |
echo "rm $TEMP_LOGFILE" | |
fi | |
echo "...done" | |
echo "" | |
done | |
fi | |
#******************************************************************************* | |
#=== Run the same grid transfer test but using increasing level of parralel transfer === | |
if [ $RUN_PARALLEL_GRID_TRANSFER_TEST = 'True' ]; then | |
for S in ${FILE_SIZES[@]}; do | |
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat" | |
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}" | |
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}" | |
echo "Moving file ${DATAFILE_NAME}..." | |
for P in ${TRANSFER_THREADS[@]}; do | |
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB_${P}_threads.dat" | |
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}" | |
#NOTE: the file is overwritten in the destination and so the time | |
# it takes globus to check this and delete the file will appear in | |
# the time measured via the time stamps! | |
TRANSFER_OPTIONS="${BASE_TRANSFER_OPTIONS} -p ${P}" | |
if [ $DRY_RUN = 'False' ]; then | |
echo "...using ${P} threads..." | |
#Perform grid test | |
echo "globus-url-copy options: ${TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE} | |
start_timer $TEMP_LOGFILE | |
globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> $TEMP_LOGFILE | |
end_timer $TEMP_LOGFILE | |
#Convert logfile to readable format > statfile | |
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH | |
#Move the statfile NOTE that the ststs/ dir should exist | |
mv $STATFILE_PATH "${WORKING_DIR}stats/" | |
#less $TEMP_LOGFILE | |
rm $TEMP_LOGFILE | |
else | |
echo "" | |
echo "...using ${P} threads..." | |
echo "globus-url-copy options: ${TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}" | |
echo "start_timer ${TEMP_LOGFILE}" | |
echo "globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}" | |
echo "end_timer ${TEMP_LOGFILE}" | |
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}" | |
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\"" | |
echo "rm $TEMP_LOGFILE" | |
#echo "globus delete ${DEST_DIR}/${DATAFILE_NAME}" #This delete the file on the remote end | |
fi | |
done | |
echo "... done" | |
echo "" | |
done | |
fi | |
#******************************************************************************* | |
#=== Run the same grid transfer test but using increasing level of block sizes === | |
if [ $RUN_BLOCK_SIZE_GRID_TRANSFER_TEST = 'True' ]; then | |
for S in ${FILE_SIZES[@]}; do | |
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat" | |
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}" | |
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}" | |
echo "Moving file ${DATAFILE_NAME}..." | |
for BS_MB in ${TRANSFER_BLOCK_SIZE[@]}; do | |
BS=$(float_string_to_int $(MB_to_byte $BS_MB)) | |
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB_${BS_MB}_MB_block_size.dat" | |
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}" | |
#NOTE: the file is overwritten in the destination and so the time | |
# it takes globus to check this and delete the file will appear in | |
# the time measured via the time stamps! | |
TRANSFER_OPTIONS="${BASE_TRANSFER_OPTIONS} -bs ${BS}" | |
if [ $DRY_RUN = 'False' ]; then | |
echo "...using ${BS_MB} MB block size..." | |
#Perform grid test | |
echo "globus-url-copy options: ${TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE} | |
start_timer $TEMP_LOGFILE | |
globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE} | |
end_timer $TEMP_LOGFILE | |
#Convert logfile to readable format > statfile | |
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH | |
#Move the statfile NOTE that the ststs/ dir should exist | |
mv $STATFILE_PATH "${WORKING_DIR}stats/" | |
#less $TEMP_LOGFILE | |
rm $TEMP_LOGFILE | |
else | |
echo "" | |
echo "...using ${BS_MB} MB block size..." | |
echo "globus-url-copy options: ${TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}" | |
echo "start_timer ${TEMP_LOGFILE}" | |
echo "globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}" | |
echo "end_timer ${TEMP_LOGFILE}" | |
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}" | |
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\"" | |
echo "rm $TEMP_LOGFILE" | |
#echo "globus delete ${DEST_DIR}/${DATAFILE_NAME}" #This delete the file on the remote end | |
fi | |
done | |
echo "... done" | |
echo "" | |
done | |
fi | |
#******************************************************************************* | |
#=== Run grid transfer test with increasing level of block size and parallelism === | |
if [ $RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST = 'True' ]; then | |
for S in ${FILE_SIZES[@]}; do | |
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat" | |
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}" | |
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}" | |
echo "Moving file ${DATAFILE_NAME}..." | |
for P in ${TRANSFER_THREADS[@]}; do | |
for BS_MB in ${TRANSFER_BLOCK_SIZE[@]}; do | |
BS=$(float_string_to_int $(MB_to_byte $BS_MB)) | |
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB_${P}_threads_${BS_MB}_MB_block_size.dat" | |
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}" | |
#NOTE: the file is overwritten in the destination and so the time | |
# it takes globus to check this and delete the file will appear in | |
# the time measured via the time stamps! | |
TRANSFER_OPTIONS="${BASE_TRANSFER_OPTIONS} -p ${P} -bs ${BS}" | |
if [ $DRY_RUN = 'False' ]; then | |
echo "... using ${P} threads with ${BS_MB} MB block size..." | |
#Perform grid test | |
echo "globus-url-copy options: ${TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE} | |
start_timer $TEMP_LOGFILE | |
globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE} | |
end_timer $TEMP_LOGFILE | |
#Convert logfile to readable format > statfile | |
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH | |
#Move the statfile NOTE that the ststs/ dir should exist | |
mv $STATFILE_PATH "${WORKING_DIR}stats/" | |
#less $TEMP_LOGFILE | |
rm $TEMP_LOGFILE | |
else | |
echo "" | |
echo "... using ${P} threads with ${BS_MB} MB block size..." | |
echo "globus-url-copy options: ${TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}" | |
echo "start_timer ${TEMP_LOGFILE}" | |
echo "globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}" | |
echo "end_timer ${TEMP_LOGFILE}" | |
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}" | |
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\"" | |
echo "rm $TEMP_LOGFILE" | |
#echo "globus delete ${DEST_DIR}/${DATAFILE_NAME}" #This delete the file on the remote end | |
fi | |
done | |
echo "" | |
done | |
echo "... done" | |
echo "" | |
done | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Utility functions to extract info and tomplot the transfer stats files | |
generated from the globus-url-copy tests. | |
The stats file has to have the following structure: | |
-------------------------------------------------------------------------------- | |
globus-url-copy options: v -c -vb -p 1 | |
Transfer started: 2022-03-15 15:18:55 | |
Source: gsiftp://cosmogw.kosmo.physik.uni-muenchen.de/home/rozgonyi/grid_transfer_tests/ | |
Dest: gsiftp://globus.ilifu.ac.za/users/krozgonyi/grid_transfer_tests/ | |
test.dat | |
29360128 bytes 5.60 MB/sec avg 5.60 MB/sec inst | |
... | |
1000000000 bytes 8.02 MB/sec avg 9.15 MB/sec inst | |
Transfer finished: 2022-03-15 15:20:58 | |
-------------------------------------------------------------------------------- | |
Getting the info out from this file rely on hard-coded line numbers, so diviation | |
from this format will break the code! | |
""" | |
import os | |
from os import listdir | |
from os.path import isfile, join | |
import copy | |
import logging | |
import numpy as np | |
import numpy.ma as ma | |
from datetime import datetime | |
from linecache import getline | |
import matplotlib | |
import matplotlib.pyplot as plt | |
from matplotlib.offsetbox import AnchoredText | |
from matplotlib.patheffects import withStroke | |
#=== Globals === | |
#RCparams for plotting | |
matplotlib.rcParams['xtick.direction'] = 'in' | |
matplotlib.rcParams['ytick.direction'] = 'in' | |
matplotlib.rcParams['xtick.major.size'] = 9 | |
matplotlib.rcParams['ytick.major.size'] = 9 | |
matplotlib.rcParams['xtick.major.width'] = 3 | |
matplotlib.rcParams['ytick.major.width'] = 3 | |
matplotlib.rcParams['xtick.minor.size'] = 6 | |
matplotlib.rcParams['ytick.minor.size'] = 6 | |
matplotlib.rcParams['xtick.minor.width'] = 2 | |
matplotlib.rcParams['ytick.minor.width'] = 2 | |
matplotlib.rcParams['axes.linewidth'] = 2 | |
plt.rcParams['xtick.labelsize']=16 | |
plt.rcParams['ytick.labelsize']=16 | |
plt.rcParams['legend.fontsize']=16 | |
plt.rcParams['legend.framealpha']=1.0 | |
plt.rcParams['legend.edgecolor']='black' | |
#4 sampled colors from viridis | |
c0 = '#440154';#Purple | |
c1 = '#30678D';#Blue | |
c2 = '#35B778';#Greenish | |
c3 = '#FDE724';#Yellow | |
outlier_color = 'grey' | |
#=== Globals II === | |
_VALID_AXES = ['p', 'bs', 'S'] | |
#******************************************************************************* | |
#=== Functions === | |
def byte_to_MB(s): | |
"""Convert bytes to MB | |
""" | |
return float(s / 1000000) | |
def MB_to_GB(s): | |
"""Convert MB to GB | |
""" | |
return float(s / 1000) | |
def MB_per_s_to_Gbps(v): | |
"""Convert MB/s to Gps (Giga bit per second) i.e. using the byte to bit | |
conversion as well | |
""" | |
return float((v * 8) / 1000) | |
def s_to_hms(t): | |
"""Convert seconds to hours:minutes:seconds format string | |
""" | |
m, s = divmod(t, 60) | |
h, m = divmod(m, 60) | |
return "{0:02d}:{1:02d}:{2:02d}".format(h, m, s) | |
def split_date(date_string): | |
"""Split a date string formatted as yyyy-mm-dd to three variables of y m and d | |
""" | |
date_chunks = date_string.split('-') #Split by dashes | |
#Get values converted to int | |
year = int(date_chunks[0]) | |
month = int(date_chunks[1]) | |
day = int(date_chunks[2]) | |
return year, month, day | |
def split_time(time_string): | |
"""Split a time string formatted as hh:mm:ss to three variables of h m and s | |
""" | |
time_chunks = time_string.split(':') #Split by dashes | |
#Get values converted to int | |
hour = int(time_chunks[0]) | |
minute = int(time_chunks[1]) | |
second = int(time_chunks[2]) | |
return hour, minute, second | |
def get_sfile_name_list_from_standardly_named_files(fsize_list, | |
Np_list=None, | |
bs_list=None, | |
name_base='transfer_stats_file_size_'): | |
"""Simple code to get a list of stats file names based on the parameters | |
provided and the naming scheme used in the `simple_grid_test.sh` script | |
Parameters: | |
=========== | |
fsize_list: list | |
List containing the transferred file sizes. The elements can be float, | |
int or string type variables. These should be formatted as in the | |
`simple_grid_test.sh` script | |
Np_list: list of int, optional | |
List containing the number of parallel threads used | |
bs_list: list, optional | |
List containing the block sizes formatted as in the `simple_grid_test.sh` | |
script. The type can be float, int or str. | |
name_base: str, optional | |
Should be equivalent to the `STATFILE_FNAME_BASE` variable used in the | |
`simple_grid_test.sh` script | |
Returns: | |
======== | |
sfile_list: list of str | |
A list containing the file names which then can be read in. | |
""" | |
sfile_list = [] | |
#When only the file size is provided | |
if Np_list == None and bs_list == None: | |
for S in fsize_list: | |
sfile_list.append('{0:s}{1:s}_GB.dat'.format(name_base,str(S))) | |
#When the number of threads also provided | |
elif Np_list != None and bs_list == None: | |
for S in fsize_list: | |
for p in Np_list: | |
sfile_list.append('{0:s}{1:s}_GB_{2:d}_threads.dat'.format( | |
name_base, str(S), p)) | |
#When the block sizes (but not the number of parallel threads) are provided | |
elif Np_list == None and bs_list != None: | |
for S in fsize_list: | |
for bs in bs_list: | |
sfile_list.append('{0:s}{1:s}_GB_{2:s}_MB_block_size'.format( | |
name_base, str(S), str(bs))) | |
#When all params are provided | |
else: | |
for S in fsize_list: | |
for p in Np_list: | |
for bs in bs_list: | |
sfile_list.append('{0:s}{1:s}_GB_{2:d}_threads_{3:s}_MB_block_size.dat'.format( | |
name_base, str(S), p, str(bs))) | |
return sfile_list | |
def check_stats_file_structure(sfile): | |
"""This routine checks if the stats file is structured as expected and raises | |
error fot the corresponding part if not | |
Should help with processing stats files | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
Returns True if the format is valid otherwise should raise an error | |
""" | |
#Just call a bunch of functions that should raise the corresponding error | |
_ = get_globus_url_copy_options(sfile) #First line | |
_ = get_start_and_end_timestamps(sfile) #Time stamps (2nd and last lines) | |
_ = get_source_and_dest_servers(sfile, raw_out=True) #Server names (3rd & 4th lines) | |
_ = get_values_from_stats_entry(getline(sfile, 7).strip()) #Test if the 7th line is a stats entry | |
_ = get_total_size_transferred(sfile) #Check last stats entry line if exist | |
return True | |
def get_globus_url_copy_options(sfile): | |
"""Get the first line from the stats file | |
This line contains the globus-url-copy command options | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
options_string: str | |
The first line of the file | |
""" | |
#Get the start time | |
with open(sfile, mode='r') as f: | |
#first_line = f.readline().strip() #The strip removes the endline character | |
options_string = f.readline().strip() | |
return options_string | |
def parser_parameters_from_options_string(options_string): | |
"""Parser the globus-url-copy options to hunam-readable and machine-readable | |
variables for later automatic processing. | |
NOTE that the run parameters e.g. number of parallel processes could be read | |
from the options string | |
Parameters: | |
=========== | |
options_string: str | |
The first line of the stats file i.e. get by using `get_globus_url_copy_options` | |
Retruns: | |
======== | |
globus_url_copy_options_dict: dict | |
Dictionary containing the option switches and the values if any | |
""" | |
#Split string | |
line_chunks = options_string.split(' ') | |
#Check if we read the right string | |
if '{0:s} {1:s}'.format(line_chunks[0],line_chunks[1]) != 'globus-url-copy options:': | |
raise ValueError('Bady formatted globus-url-copy options string!') | |
#Initialise the output dict | |
globus_url_copy_options_dict = {} | |
#Loop through the options and populate the dict | |
for i in range(2,len(line_chunks)): | |
if line_chunks[i][0] == '-': | |
if i == len(line_chunks) - 1: | |
globus_url_copy_options_dict[line_chunks[i]] = None #If looking at last argument | |
else: | |
if line_chunks[i+1][0] != '-': | |
globus_url_copy_options_dict[line_chunks[i]] = line_chunks[i+1] | |
else: | |
globus_url_copy_options_dict[line_chunks[i]] = None | |
else: | |
pass | |
return globus_url_copy_options_dict | |
def get_p_and_bs(sfile): | |
"""Get the number of parallel threads and block size from the stats file header | |
NOTE the default values are: | |
- p = 1 | |
- bs = 0 MB (because I have no ide of the default) | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
p: int | |
Number of parallel threads used in the test | |
bs: float | |
The block size used for the transfer in [MB] | |
""" | |
globus_url_copy_options_dict = \ | |
parser_parameters_from_options_string(get_globus_url_copy_options(sfile)) | |
#This is fixing an error but I don't see what.... | |
p = int(1) | |
bs = float(0.0) | |
#Get p | |
if '-p' in globus_url_copy_options_dict: | |
p = int(globus_url_copy_options_dict['-p']) | |
elif '-parallel' in globus_url_copy_options_dict: | |
p = int(globus_url_copy_options_dict['-parallel']) | |
else: | |
p = int(1) | |
#Get block size | |
if '-bs' in globus_url_copy_options_dict: | |
bs = byte_to_MB(float(globus_url_copy_options_dict['-bs'])) | |
elif '-block-size' in globus_url_copy_options_dict: | |
bs = byte_to_MB(float(globus_url_copy_options_dict['-block-size'])) | |
else: | |
bs = float(0.0) | |
return p, bs | |
def get_start_and_end_timestamps(sfile): | |
"""Get the start and end timestamps as strings from the stats file | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
start_time: str | |
The start date and time formatted as a string | |
end_time: str | |
The end date and time formatted as a string | |
""" | |
#Get the start time | |
#with open(sfile, mode='r') as f: | |
# first_line = f.readline().strip() #The strip removes the endline character | |
#Hardcoded the first timestamp line no | |
#NOTE this is not the first but the second line in the stats file ! | |
first_line = getline(sfile, 2).strip() #The strip removes the endline character | |
#Extract date and time from string and convert to a python time object | |
line_chunks = first_line.split(' ') #Split by spaces | |
if line_chunks[0] + ' ' + line_chunks[1] != 'Transfer started:': | |
raise ValueError('The first line of the file contains wrongly formatted time stamp!') | |
start_time = str(line_chunks[2] + ' ' + line_chunks[3]) | |
#Get the end time | |
#Get the start time | |
with open(sfile, mode='r') as f: | |
for line in f: | |
pass | |
last_line = line #No need for strip | |
#Extract date and time from string and convert to a python time object | |
line_chunks = last_line.split(' ') #Split by spaces | |
if line_chunks[0] + ' ' + line_chunks[1] != 'Transfer finished:': | |
raise ValueError('The last line of the file contains wrongly formatted time stamp!') | |
end_time = str(line_chunks[2] + ' ' + line_chunks[3]) | |
return start_time, end_time | |
def get_source_and_dest_servers(sfile, raw_out=False): | |
"""Get the string defining the start and end point servers used for the test | |
from the stats file | |
NOTE: this function returns the specific locations if the raw_out is defined! | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
raw_out: bool, opt | |
If true, the raw strings returned else the code attemps to extract the | |
server name from the string | |
Returns: | |
======== | |
source_server: str | |
The line string containing the source server | |
dest_server: str | |
The line string containing the destination server | |
""" | |
#NOTE that the line numbers are hard-coded in the stats file generation (!) | |
#Get startpoint server and location | |
source_server = getline(sfile, 3).strip() #The strip removes the endline character | |
if source_server.split()[0] != 'Source:': | |
#print(source_server.split()[0]) | |
raise ValueError('Wrongly formatted stats file!') | |
#Get endpoint server and location | |
dest_server = getline(sfile, 4).strip() #The strip removes the endline character | |
if dest_server.split()[0] != 'Dest:': | |
raise ValueError('Wrongly formatted stats file!') | |
if raw_out: | |
return source_server.split()[1], dest_server.split()[1] | |
else: | |
#Extract server names | |
source_server_name = source_server.split()[1].split('/')[2] | |
dest_server_name = dest_server.split()[1].split('/')[2] | |
return source_server_name, dest_server_name | |
def get_transfer_time(sfile): | |
"""Read the total transfer time from the time stamps | |
The time stamps should be formatted as: | |
`Transfer started: yyyy-mm-dd hh:mm:ss` | |
...file.. | |
`Transfer finished: yyyy-mm-dd hh:mm:ss` | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
transfer_time: int | |
The total transfer time in seconds | |
""" | |
#=== Start time stamp | |
#Get start and end time strings | |
start_time_string, end_time_string = get_start_and_end_timestamps(sfile) | |
timestamps = [] | |
for time_stamp in [start_time_string, end_time_string]: | |
line_chunks = time_stamp.split(' ') #Split by spaces | |
year, month, day = split_date(line_chunks[0]) | |
hour, minute, second = split_time(line_chunks[1]) | |
timestamps.append(datetime(year=year, month=month, day=day, | |
hour=hour, minute=minute, second=second, | |
microsecond=0)) #We only work with second precision | |
#Calculate time difference in seconds (!) | |
#By using the builtin datetime function I solve the problem of leap seconds | |
#and February the 29th... | |
transfer_time = (timestamps[1]-timestamps[0]).total_seconds() | |
return int(transfer_time) | |
def get_values_from_stats_entry(stats_entry_line): | |
"""Get the values of: | |
- data transferred [bytes] | |
- average transfer rate [MB/s] | |
- instantaneous transfer rate [MB/s] | |
from a single entry line string read from a stats file | |
The input entry line thus should be formatted as: | |
` 769916928 bytes 7.73 MB/sec avg 9.40 MB/sec inst` | |
Parameters: | |
=========== | |
stats_entry_line: str | |
String containing a single stats entry line from the stats file | |
Returns: | |
======== | |
data_transferred: float | |
The data transferred in MB (!) | |
avg_transfer_rate: float | |
The average data transfer rate in MB/s | |
inst_transfer_rate: float | |
The instantaneous data transfer rate in MB/s | |
""" | |
#Split the line string | |
stat_chunks = stats_entry_line.split() | |
#Get data transferred | |
if stat_chunks[1] != 'bytes': | |
raise ValueError('Wrongly formatted stats line!') | |
data_transferred = byte_to_MB(int(stat_chunks[0])) | |
#Get average transfer rate | |
if stat_chunks[3] != 'MB/sec' or stat_chunks[4] != 'avg': | |
raise ValueError('Wrongly formatted stats line!') | |
avg_transfer_rate = float(stat_chunks[2]) | |
#Get instantaneous transfer rate | |
if stat_chunks[6] != 'MB/sec' or stat_chunks[7] != 'inst': | |
raise ValueError('Wrongly formatted stats line!') | |
inst_transfer_rate = float(stat_chunks[5]) | |
return data_transferred, avg_transfer_rate, inst_transfer_rate | |
def get_total_size_transferred(sfile, unit='MB'): | |
"""Read from the file the total number of bytes transferred | |
NOTE: this might not equal the size of the original file if the transfer fails! | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
unit: str | |
The unit in which the data is returned. Valid options are: | |
['MB', 'GB'] | |
Returns: | |
======== | |
transfer_size: int | |
The total transferred file size in bytes | |
""" | |
#Get the number of lines because negative indexing does not work for getline() | |
number_of_lines = len(open(sfile).readlines()) | |
last_entry_line_no = int(number_of_lines - 2) | |
last_stat_entry = getline(sfile, last_entry_line_no).strip() #The strip removes the endline character | |
#print(last_stat_entry) | |
transfer_size, avg_transfer_rate, inst_transfer_rate = get_values_from_stats_entry(last_stat_entry) | |
if unit == 'MB': | |
return transfer_size | |
elif unit == 'GB': | |
return MB_to_GB(transfer_size) | |
else: | |
raise ValueError('Invalid unit is specified') | |
def get_avg_transfer_rate(sfile): | |
"""Get the average transfer rate from: | |
- manually computing from time stamp and overall transferred size | |
- from the last average transfer rate stats entry | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
manual_avg_transfer_rate: float | |
Average data transfer rate computed manually from file size and time stamps | |
[MB/s] | |
avg_transfer_rate: float | |
Average data transfer rate read from the stats file | |
[MB/s] | |
diff_avg_transfer_rate: float | |
Absolute difference between `manual_avg_transfer_rate` and `avg_transfer_rate` | |
[MB/s] | |
""" | |
#Compute the manual value | |
data_transferred = get_total_size_transferred(sfile, unit='MB') | |
transfer_time = get_transfer_time(sfile) | |
manual_avg_transfer_rate = data_transferred / transfer_time | |
#Get the stats value | |
number_of_lines = len(open(sfile).readlines()) | |
last_entry_line_no = int(number_of_lines - 2) | |
last_stat_entry = getline(sfile, last_entry_line_no).strip() #The strip removes the endline character | |
#print(last_stat_entry) | |
transfer_size, avg_transfer_rate, inst_transfer_rate = get_values_from_stats_entry(last_stat_entry) | |
#Compute the (absolute) difference | |
diff_avg_transfer_rate = np.fabs(np.subtract(avg_transfer_rate, manual_avg_transfer_rate)) | |
#Return both values | |
return manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate | |
def get_stat_arrays(sfile): | |
"""Get the time-dependent statistics from the file. | |
The main lines should have the following format: | |
` 769916928 bytes 7.73 MB/sec avg 9.40 MB/sec inst` | |
this script returns 4 arrays of: | |
- a relative measurement time array [arbitary time units] (!) | |
- data transferred [bytes] | |
- average transfer rate [MB/s] | |
- instantaneous transfer rate [MB/s] (this is measured over the arbitary time unit) | |
Since, I have no idea how frequently globus-url-copy updates it's stats I just | |
consider this interval as an arbitary time unit with CONSTANT update rate | |
As such, the unit of the first vector is meaningless, and should be used only | |
for qualitative analysis. | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
Returns: | |
======== | |
time_array: <numpy.Ndarray> | |
Array containing the relative measurement time array [arbitary time units] (!) | |
size_array: <numpy.Ndarray> | |
Array containing the data transferred [MB] | |
avg_transfer_rate_array: <numpy.Ndarray> | |
Array containing the average transfer rate [MB/s] | |
inst_transfer_rate_array: <numpy.Ndarray> | |
Array containing the instantaneous transfer rate [MB/s] | |
""" | |
#Get the number of lines in the file | |
number_of_lines = len(open(sfile).readlines()) | |
#The first 6 lines are the header | |
#The last 2 lineas are the railing empty line and time stamp | |
#NOTE that I initialise the output vectors with 0-s hence, the extra one entry | |
N_stats_entries = int(number_of_lines - 7) | |
#Initialise output arrays | |
time_array = np.zeros(N_stats_entries) | |
size_array = np.zeros(N_stats_entries) | |
avg_transfer_rate_array = np.zeros(N_stats_entries) | |
inst_transfer_rate_array = np.zeros(N_stats_entries) | |
line_no = 7 #Hard-coding the first line | |
for i in range(1,N_stats_entries): | |
stats_line = getline(sfile, line_no).strip() #The strip removes the endline character | |
data_transferred, avg_transfer_rate, inst_transfer_rate = get_values_from_stats_entry(stats_line) | |
time_array[i] = i | |
size_array[i] = data_transferred | |
avg_transfer_rate_array[i] = avg_transfer_rate | |
inst_transfer_rate_array[i] = inst_transfer_rate | |
line_no += 1 | |
return time_array, size_array, avg_transfer_rate_array, inst_transfer_rate_array | |
#******************************************************************************* | |
#=== Plotting functions === | |
def create_stats_summary_plot(sfile, ofname): | |
"""Create a summary plot from a stats file for a single grid dtansfer test | |
Parameters: | |
=========== | |
sfile: str | |
The name and relative (or full) path to the stats file | |
ofname: str | |
The oupput file name and relative (or full) path | |
Returns: | |
======== | |
Creates a summary image | |
""" | |
#Get the stats | |
ransfer_time = s_to_hms(get_transfer_time(sfile)) #In h:m_s string | |
data_transferred = get_total_size_transferred(sfile, unit='GB') | |
manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = get_avg_transfer_rate(sfile) | |
time_array, size_array, avg_transfer_rate_array, inst_transfer_rate_array = get_stat_arrays(sfile) | |
#Ger some strings for stats | |
#options_string = get_globus_url_copy_options(sfile) | |
p, bs = get_p_and_bs(sfile) | |
start_time_string, end_time_string = get_start_and_end_timestamps(sfile) | |
source_server_string, dest_server_string = get_source_and_dest_servers(sfile, raw_out=False) | |
#Create plot | |
fig, ax = plt.subplots(1, 1, figsize=(12.,6.5)) | |
fig.suptitle("Avg transfer rate: {3:.2f} +/- {4:.2f} MB/s | Elapsed time: {7:s} \n\ | |
File size: {0:.2f} GB | Parallel threads: {8:d} | Block size: {9:.2f} MB\n\ | |
Transfer started: {1:s} | finished: {2:s}\ | |
Source: {5:s} | Dest: {6:s}".format( | |
data_transferred, start_time_string, end_time_string, | |
avg_transfer_rate, diff_avg_transfer_rate, | |
source_server_string, dest_server_string, | |
ransfer_time, p, bs), | |
fontsize=18) | |
#Note that after the time strings a line separator is missing on purpose | |
#Adjust for long title | |
fig.subplots_adjust(top=0.8) | |
#Original axis | |
#Average transfer rate | |
lns2 = ax.plot(time_array[1:], avg_transfer_rate_array[1:], lw=4, alpha=0.5, c=c0, | |
label='Avg transfer rate') | |
#Instantaneous transfer rate | |
lns3 = ax.step(time_array[1:], inst_transfer_rate_array[1:], lw=3, alpha=0.75, c=c2, | |
label='Inst transfer rate') | |
#Indicate the "error" based on the manual and stats average transfer rate | |
ax.fill_between(time_array[1:], avg_transfer_rate_array[1:] - diff_avg_transfer_rate, | |
avg_transfer_rate_array[1:] + diff_avg_transfer_rate, facecolor=c0, alpha=0.2) | |
#Second y axis | |
ax2 = ax.twinx() | |
lns1 = ax2.plot(time_array, size_array, lw=4, alpha=0.5, color=c1, zorder=0, | |
label='Data transferred') | |
ax2.set_yscale("log") | |
#Add legend | |
lns = lns1+lns2+lns3 | |
labs = [l.get_label() for l in lns] | |
ax.legend(lns, labs, loc='best').get_frame().set_linewidth(2) | |
#Set zorder | |
ax.set_zorder(ax2.get_zorder()+1) | |
ax.patch.set_visible(False) | |
ax.grid(lw=1.5, linestyle='-', alpha=.4, color=outlier_color) | |
ax2.yaxis.grid(True, which='major', lw=1.5, linestyle='--', alpha=.4, color=c1) | |
ax2.set_ylabel(r'Data transferred [MB]', color=c1, fontsize = 18) | |
ax.set_xlabel(r'Time [arbitary units]', fontsize = 18) | |
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18) | |
#plt.show() | |
plt.savefig(ofname, bbox_inches='tight') | |
plt.close() | |
def quick_and_dirty_simple_analysis_all_stats_results(working_dir_path=None,fig_extension='.png'): | |
"""Routine to generate summary plots from arbitary stats files. | |
Takes ALL stat files in the `working_dir`/stats/ directory and try to generate | |
the summary plots for all. Continues on faliure i.e. if the stats files are | |
formatted badly because the transfer is failed. | |
The output is generated to the `working_dir`/figures/ directory using the same | |
names as the stat files. | |
Parameters: | |
=========== | |
working_dir_path: str, optional | |
Full path of the parent directory of the stats/ and figures/ directories | |
The figures directory should have a quick_and_dirty/ subdiectory! | |
fig_extension: str, optional | |
String defining the type of images created | |
Returns: | |
======== | |
Creates summary plots for all stats files under figures/quick_and_dirty/ | |
""" | |
if working_dir_path == None: | |
working_dir = './' | |
else: | |
working_dir = working_dir_path | |
stats_dir_path = working_dir + 'stats/' | |
#Get the stats file list | |
stats_file_list = [f for f in listdir(stats_dir_path) if isfile(join(stats_dir_path, f))] | |
if len(stats_file_list) == 0: | |
raise ValueError('No stat files found under {0:s} !'.format(stats_dir_path)) | |
#Loop through the files and create summary plots | |
for stats_file in stats_file_list: | |
image_name = stats_file.rsplit(".",1)[0] + fig_extension | |
simage_path = working_dir + 'figures/quick_and_dirty/' + image_name | |
sfile_path = stats_dir_path + stats_file | |
#Try to generate the images | |
print('Generating summary image {0:s} ...'.format(image_name)) | |
try: | |
check_stats_file_structure(test_spath) | |
create_stats_summary_plot(sfile_path, ofname=simage_path) | |
print('...done') | |
except: | |
#print(check_stats_file_structure(test_spath)) | |
#create_stats_summary_plot(sfile_path, ofname=simage_path) | |
print('... error reading in the file ... proceed') | |
def get_parameterspace_slice(stats_file_list, | |
axis_to_get='p', | |
working_dir_path=None, | |
continue_on_size_error=False): | |
"""The core function to generate a transfer rate -- number of threads | |
OR ransfer rate -- block size OR transfer rate -- file size DATA | |
The input list should contain stats files which has the same sized data | |
transferred and with the same block size or with the same number of threads, | |
or with both... i.e. this function gets a data cur in the parameterspace | |
Obviously the servers should be the same for all tests | |
NOTE the code only works if the `stats_file_list` contains only the proper | |
stas files! So be careful of the `axis_to_plot` value | |
Parameters: | |
=========== | |
stats_file_list: str | |
The name of the stats files. Note that the files should be under the stats/ | |
directory. The transferred file and block sizes should be equivalent | |
axis_to_plot: str, opt | |
If to plot the dependence on the number of parallel threads ('p') or in the | |
block size ('bs'), or get the file size dependence ('S') | |
working_dir_path: str, optional | |
Full path of the parent directory of the stats/ and figures/ directories | |
continue_on_size_error: bool, optional | |
If a transfer fails (i.e. timed out) usually the sfile is generated, but | |
not all the data is transferred. In this case this code halts. However, | |
if this parameter is set to True, the code runs as normal and returns nan | |
values for the failed transfers rather then the measured values | |
Returns: | |
======== | |
dependence_array: <numpy.nDarray> | |
Either the number of parallel threads, or the block size values | |
avg_transfer_rate_array: <numpy.nDarray> | |
Array containing the avargae trans fer rates | |
diff_avg_transfer_rate_array: <numpy.nDarray> | |
Array containing the 'error' for the average transfer rates | |
params_dict: dict | |
A dictionary containing some metadata shared between the tests as follows: | |
params_dict['p'] = p_baseline | |
params_dict['bs'] = bs_baseline | |
params_dict['source'] = source_server_string_baseline | |
params_dict['dest'] = dest_server_string_baseline | |
params_dict['S'] = data_transferred_baseline | |
""" | |
if axis_to_get not in _VALID_AXES: | |
raise ValueError('Invalid axis for generating dependence vectors is given!') | |
if working_dir_path == None: | |
working_dir = './' | |
else: | |
working_dir = working_dir_path | |
stats_dir_path = working_dir + 'stats/' | |
stats_file_list = [stats_dir_path + sfile for sfile in stats_file_list] | |
#Declare arrays to get | |
avg_transfer_rate_array = [] #Average transfer rate | |
diff_avg_transfer_rate_array = [] #transfger rate uncertainity | |
params_dict = {} | |
if axis_to_get == 'p': | |
N_parallel_threads = [] #Number of parallel threads | |
elif axis_to_get == 'bs': | |
bs_array = [] #Block sizes | |
elif axis_to_get == 'S': | |
S_array = [] | |
#Initialise block size and transferred file size | |
p_baseline, bs_baseline = get_p_and_bs(stats_file_list[0]) | |
source_server_string_baseline, dest_server_string_baseline = \ | |
get_source_and_dest_servers(stats_file_list[0], raw_out=False) | |
data_transferred_baseline = get_total_size_transferred(stats_file_list[0], unit='GB') | |
params_dict['S'] = data_transferred_baseline | |
params_dict['p'] = p_baseline | |
params_dict['bs'] = bs_baseline | |
params_dict['source'] = source_server_string_baseline | |
params_dict['dest'] = dest_server_string_baseline | |
#Set up flag | |
flag = False | |
#Loop through the files and gather the relevant data | |
for sfile in stats_file_list: | |
#=== Perform sopme checks | |
#Get p and bs | |
p, bs = get_p_and_bs(sfile) | |
if axis_to_get == 'p' or axis_to_get == 'S': | |
if bs != bs_baseline: | |
raise ValueError('Different block size in the input data!') | |
elif axis_to_get == 'bs' or axis_to_get == 'S': | |
if p != p_baseline: | |
raise ValueError('Different number of threads in the input data!') | |
#Get the size of the transferred data | |
data_transferred = get_total_size_transferred(sfile, unit='GB') | |
if axis_to_get == 'S': | |
S_array.append(data_transferred) | |
else: | |
if data_transferred - data_transferred_baseline > 2: #2 Byte precision... | |
if continue_on_size_error: | |
flag = True | |
else: | |
raise ValueError('Different transferred data size in the input data!') | |
#Get the server locations | |
source_server_string, dest_server_string = \ | |
get_source_and_dest_servers(sfile, raw_out=False) | |
if source_server_string != source_server_string_baseline: | |
raise ValueError('Different source servers in the input data!') | |
if dest_server_string != dest_server_string_baseline: | |
raise ValueError('Different destination servers in the input data!') | |
#Get the data for the plot | |
if axis_to_get == 'p': | |
N_parallel_threads.append(p) | |
elif axis_to_get == 'bs': | |
bs_array.append(bs) | |
#Use the value measured by the software | |
if axis_to_get != 'S': | |
if flag: | |
avg_transfer_rate_array.append(np.nan) | |
diff_avg_transfer_rate_array.append(np.nan) | |
else: | |
manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = \ | |
get_avg_transfer_rate(sfile) | |
avg_transfer_rate_array.append(avg_transfer_rate) | |
diff_avg_transfer_rate_array.append(diff_avg_transfer_rate) | |
else: | |
manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = \ | |
get_avg_transfer_rate(sfile) | |
avg_transfer_rate_array.append(avg_transfer_rate) | |
diff_avg_transfer_rate_array.append(diff_avg_transfer_rate) | |
if axis_to_get == 'p': | |
dependence_array = N_parallel_threads | |
elif axis_to_get == 'bs': | |
dependence_array = bs_array | |
elif axis_to_get == 'S': | |
dependence_array = S_array | |
return dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict | |
def plot_parameterspace_slice_dependence(dependence_array, | |
avg_transfer_rate_array, | |
diff_avg_transfer_rate_array, | |
params_dict, | |
axis_to_plot='p', | |
working_dir_path=None, | |
ofname_base='transfer_dependence_on_', | |
fig_extension='.png'): | |
"""The core function to generate a transfer rate -- number of threads plot | |
OR ransfer rate -- block size OR transfer rate -- data size PLOT | |
The input arrays should be generated using the function | |
`get_parameterspace_slice` | |
Parameters: | |
=========== | |
dependence_array: <numpy.nDarray> | |
Either the number of parallel threads, or the block size values | |
avg_transfer_rate_array: <numpy.nDarray> | |
Array containing the avargae trans fer rates | |
diff_avg_transfer_rate_array: <numpy.nDarray> | |
Array containing the 'error' for the average transfer rates | |
params_dict: dict | |
A dictionary containing some metadata shared between the tests as follows: | |
params_dict['p'] = p_baseline | |
params_dict['bs'] = bs_baseline | |
params_dict['source'] = source_server_string_baseline | |
params_dict['dest'] = dest_server_string_baseline | |
params_dict['S'] = data_transferred_baseline | |
ofname_base: str, opt | |
The base of the output file name | |
fig_extension: str, optional | |
String defining the type of images created | |
Returns: | |
======== | |
Create the plot defined under the figures/ subdir of the `working_dir_path` | |
""" | |
if axis_to_plot not in _VALID_AXES: | |
raise ValueError('Invalid axis for generating dependence vectors is given!') | |
if working_dir_path == None: | |
working_dir = './' | |
else: | |
working_dir = working_dir_path | |
#=== Create the plot | |
fig, ax = plt.subplots(1, 1, figsize=(8.,8.5)) | |
if axis_to_plot == 'p': | |
Np_string_list = np.array(dependence_array, dtype=str) | |
fig.suptitle("File size: {0:.2f} GB | Block size: {1:.2f} MB\n\ | |
Source: {2:s}\nDest: {3:s}".format( | |
params_dict['S'], params_dict['bs'], | |
params_dict['source'], params_dict['dest']), | |
fontsize=16) | |
ax.bar(Np_string_list, avg_transfer_rate_array, | |
fc=c1, alpha=0.9) | |
ax.errorbar(Np_string_list, avg_transfer_rate_array, | |
yerr=diff_avg_transfer_rate_array, fmt=" ", | |
color=c2, lw=5, alpha=1.0) | |
ax.set_xlabel(r'Number of parallel threads', fontsize = 18) | |
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18) | |
#Get the name of the output file | |
ofname = ofname_base + 'number_of_threads_{0:.2f}_GB_{1:.1f}_MB_block_size{2:s}'.format( | |
params_dict['S'], params_dict['bs'], fig_extension) | |
elif axis_to_plot == 'bs': | |
bs_string_list = np.array(dependence_array, dtype=str) | |
fig.suptitle("File size: {0:.2f} GB | N threads: {1:d}\n\ | |
Source: {2:s}\nDest: {3:s}".format( | |
params_dict['S'], params_dict['p'], | |
params_dict['source'], params_dict['dest']), | |
fontsize=16) | |
ax.bar(bs_string_list, avg_transfer_rate_array, | |
fc=c1, alpha=0.9) | |
ax.errorbar(bs_string_list, avg_transfer_rate_array, | |
yerr=diff_avg_transfer_rate_array, fmt=" ", | |
color=c2, lw=5, alpha=1.0) | |
ax.set_xlabel(r'Block size [MB]', fontsize = 18) | |
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18) | |
#Get the name of the output file | |
ofname = ofname_base + 'block_size_{0:.2f}_GB_{1:d}_threads{2:s}'.format( | |
params_dict['S'], int(params_dict['p']), fig_extension) | |
elif axis_to_plot == 'S': | |
S_string_list = np.array(dependence_array, dtype=str) | |
fig.suptitle("Parallel threads: {0:d} GB | Block size: {1:.2f} MB\n\ | |
Source: {2:s}\nDest: {3:s}".format( | |
params_dict['p'], params_dict['bs'], | |
params_dict['source'], params_dict['dest']), | |
fontsize=16) | |
ax.bar(S_string_list, avg_transfer_rate_array, | |
fc=c1, alpha=0.9) | |
ax.errorbar(S_string_list, avg_transfer_rate_array, | |
yerr=diff_avg_transfer_rate_array, fmt=" ", | |
color=c2, lw=5, alpha=1.0) | |
ax.set_xlabel(r'Transferred data size [GB]', fontsize = 18) | |
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18) | |
#Get the name of the output file | |
ofname = ofname_base + 'file_size_{0:d}_threads_{1:.1f}_MB_block_size{2:s}'.format( | |
params_dict['p'], params_dict['bs'], fig_extension) | |
#plt.show() | |
plt.savefig(working_dir + 'figures/' + ofname, bbox_inches='tight') | |
plt.close() | |
def plot_complex_P_and_BS_results(fsize, | |
Np_list, | |
bs_list, | |
name_base='transfer_stats_file_size_', | |
fig_extension='.png'): | |
"""Script to create summary plots from the stats files created by the | |
`RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST` of the `simple_grid_test.sh` script | |
This scrip excepts all files exist under the stats/ dir following the conventional | |
naming scheme | |
If some of the transfers failed, we can skip these data by setting the | |
`continue_on_error` parameter to True | |
Parameters: | |
=========== | |
Returns: | |
======== | |
Create the plot defined under the figures/ subdir of the `working_dir_path` | |
""" | |
#Define the parameterspace shape and initialise p, bs vectors and the transfer rate matrix | |
bs_array = [] | |
transfer_rate_matrix = np.zeros((len(bs_list),len(Np_list))) | |
#Now loop trough the bs values and fill up the vectors and matrices to plot | |
for block_size, i in zip(bs_list,range(0,len(bs_list))): | |
#Get the file names | |
sfile_list = get_sfile_name_list_from_standardly_named_files(fsize_list=[fsize], | |
Np_list=Np_list, | |
bs_list=[block_size]) | |
#Get the values | |
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \ | |
get_parameterspace_slice(sfile_list, | |
axis_to_get='p', | |
working_dir_path=working_dir, | |
continue_on_size_error=True) | |
bs_array.append(block_size) | |
p_array = np.array(dependence_array) | |
transfer_rate_matrix[i,:] = avg_transfer_rate_array | |
bs_array = np.array(bs_array) | |
#=== Create the plot | |
fig, ax = plt.subplots(1, 1, figsize=(12,8)) | |
fig.suptitle("File size: {0:.2f} GB\n\ | |
Source: {1:s}\nDest: {2:s}".format( | |
params_dict['S'], | |
params_dict['source'], params_dict['dest']), | |
fontsize=18) | |
ax.set_aspect(0.5) #Square pixels | |
img = ax.imshow(transfer_rate_matrix) | |
cb = plt.colorbar(img, aspect=30, fraction=0.04975, pad=0) | |
cb.ax.yaxis.get_offset_text().set_fontsize(18) | |
cb.ax.tick_params(labelsize=18) | |
cb.ax.tick_params(direction='in', length=6, width=2) | |
cb.ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18) | |
ax.set_xticklabels(np.concatenate([[0],p_array]), fontsize=18) | |
ax.set_yticklabels(np.concatenate([[0],bs_array]), fontsize=18) | |
plt.xlabel(r'Parallel threads', fontsize = 18) | |
plt.ylabel(r'Block size [MB]', fontsize = 18) | |
#Save the fig | |
ofname = working_dir + 'figures/transfer_summary_{0:s}_GB{1:s}'.format( | |
str(fsize),fig_extension) | |
#plt.show() | |
plt.savefig(ofname, bbox_inches='tight') | |
plt.close() | |
#******************************************************************************* | |
#=== MAIN === | |
if __name__ == "__main__": | |
#Decide what plots to create | |
quick_and_dirty_all = False | |
plot_1T_file_p_dependence = False | |
plot_1T_file_bs_dependence = False | |
plot_S_dependence_for_fixed_p_and_bs = False | |
plot_full_parameterspace_where_possible = True | |
#=== Set up envinroment | |
working_dir = '/home/krozgonyi/GLOW_cluster/grid_tests/' | |
test_sname = 'transfer_stats.dat' | |
test_spath = working_dir + 'test_stats/' + test_sname | |
test_summary_image_path = working_dir + 'figures/test.pdf' | |
#=== Testing some of the base code === | |
#check_stats_file_structure(test_spath) | |
#transfer_time = get_transfer_time(test_spath) | |
#data_transferred = get_total_size_transferred(test_spath, unit='GB') | |
#manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = get_avg_transfer_rate(test_spath) | |
#time_array, size_array, avg_transfer_rate_array, inst_transfer_rate_array = get_stat_arrays(test_spath) | |
#create_stats_summary_plot(test_spath, ofname=test_summary_image_path) | |
#exit() | |
#===================================== | |
#=== Real-life examples === | |
#Quck and dirty diagnostoics plots for all files | |
if quick_and_dirty_all: | |
quick_and_dirty_simple_analysis_all_stats_results() | |
#=== Get example thread dependence | |
#Get some plots on the dependence of parallel threads using the 1024GB data | |
if plot_1T_file_p_dependence: | |
sfile_list = get_sfile_name_list_from_standardly_named_files( | |
fsize_list=[1024.0], | |
Np_list=[2,4,16,32,64,128,256], | |
bs_list=[100.0]) | |
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \ | |
get_parameterspace_slice(sfile_list, | |
axis_to_get='p', | |
working_dir_path=working_dir) | |
plot_parameterspace_slice_dependence(dependence_array, | |
avg_transfer_rate_array, | |
diff_avg_transfer_rate_array, | |
params_dict, | |
axis_to_plot='p', | |
working_dir_path=working_dir) | |
#=== Get block size dependence | |
if plot_1T_file_bs_dependence: | |
sfile_list = get_sfile_name_list_from_standardly_named_files( | |
fsize_list=[1024.0], | |
Np_list=[256], | |
bs_list=[10.0, 100.0, 1000.0, 2000.0, 4000.0]) | |
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \ | |
get_parameterspace_slice(sfile_list, | |
axis_to_get='bs', | |
working_dir_path=working_dir) | |
plot_parameterspace_slice_dependence(dependence_array, | |
avg_transfer_rate_array, | |
diff_avg_transfer_rate_array, | |
params_dict, | |
axis_to_plot='bs', | |
working_dir_path=working_dir) | |
if plot_S_dependence_for_fixed_p_and_bs: | |
sfile_list = get_sfile_name_list_from_standardly_named_files( | |
fsize_list=[4.0, 16.0, 64.0, 256.0, 1024.0], | |
Np_list=[256], | |
bs_list=[100.0]) | |
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \ | |
get_parameterspace_slice(sfile_list, | |
axis_to_get='S', | |
working_dir_path=working_dir) | |
plot_parameterspace_slice_dependence(dependence_array, | |
avg_transfer_rate_array, | |
diff_avg_transfer_rate_array, | |
params_dict, | |
axis_to_plot='S', | |
working_dir_path=working_dir) | |
#== Create summary plots for the p-bs parameterspace | |
if plot_full_parameterspace_where_possible: | |
fsize_list = [1.0, 4.0, 16.0, 64.0, 256.0] | |
for fsize in fsize_list: | |
plot_complex_P_and_BS_results(fsize=fsize, | |
Np_list=[1,2,4,16,32,64,128,256], | |
bs_list=[1.0,10.0,100.0,1000.0,2000.0,4000.0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment