Last active
March 27, 2021 03:37
-
-
Save crazyboycjr/24d784f6f84c895c1e89b4c0b6b6b02f to your computer and use it in GitHub Desktop.
start_flow.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import threading | |
import shlex | |
import time | |
import argparse | |
hosts = [ | |
'192.168.211.2', | |
'192.168.211.34', | |
'192.168.211.130', | |
'192.168.211.162', | |
] | |
hosts_mask = [1,1,1,1] | |
def get_avail_by_mask(hosts, mask): | |
return [x[1] for x in filter(lambda x: x[0] != 0, zip(mask, hosts))] | |
hosts_avail = get_avail_by_mask(hosts, hosts_mask) | |
ip_to_id_cnt = 0 | |
ip_to_id = {} | |
for h in hosts_avail: | |
ip_to_id[h] = ip_to_id_cnt | |
ip_to_id_cnt += 1 | |
def get_id_from_ip(ip, hosts): | |
for i, h in enumerate(hosts): | |
if h == ip: return i | |
assert False, "no valid id found for ip: {}".format(ip) | |
def run_task(cmd): | |
#cmd_snip = shlex.split(cmd + " i am " + str(tid)) | |
cmd_snip = shlex.split(cmd) | |
p = subprocess.Popen(cmd_snip, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = p.communicate() | |
#print 'out:', out | |
#print 'err:', err | |
#print 'rc:', p.returncode | |
return out, err | |
''' | |
--------------------------------------------------------------------------------------- | |
RDMA_Write BW Test | |
Dual-port : OFF Device : mlx5_0 | |
Number of qps : 1 Transport type : IB | |
Connection type : RC Using SRQ : OFF | |
CQ Moderation : 100 | |
Mtu : 1024[B] | |
Link type : Ethernet | |
GID index : 3 | |
Max inline data : 0[B] | |
rdma_cm QPs : ON | |
Data ex. method : rdma_cm | |
--------------------------------------------------------------------------------------- | |
Waiting for client rdma_cm QP to connect | |
Please run the same command with the IB/RoCE interface IP | |
--------------------------------------------------------------------------------------- | |
local address: LID 0000 QPN 0x2e5e PSN 0x9078af | |
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:22:02 | |
remote address: LID 0000 QPN 0x036e PSN 0x472771 | |
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:24:02 | |
--------------------------------------------------------------------------------------- | |
#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps] | |
65536 305338 0.00 26.69 0.050913 | |
--------------------------------------------------------------------------------------- | |
''' | |
def parse_ib_write_output(text): | |
lines = text.splitlines() | |
def extract_ipv4(line): | |
return '.'.join(map(lambda x: str(int(x)), line.split(b':')[-4:])) | |
for i, line in enumerate(lines): | |
if line.startswith(b' local'): | |
local_ip = extract_ipv4(lines[i + 1]) | |
if line.startswith(b' remote'): | |
remote_ip = extract_ipv4(lines[i + 1]) | |
if line.startswith(b' #bytes'): | |
bw_line = lines[i + 1] | |
words = list(filter(lambda x: len(x) > 0, bw_line.split(b' '))) | |
bw = words[3] | |
#print('local_ip={}, remote_ip={}, bw={}'.format(local_ip, remote_ip, bw)) | |
return local_ip, remote_ip, bw.decode() | |
def receiver_task(cmd): | |
out, err = run_task(cmd) | |
return parse_ib_write_output(out) | |
# local_ip, remote_ip, bw = parse_ib_write_output(out) | |
# src_id = get_id_from_ip(remote_ip, hosts_avail) | |
# dst_id = get_id_from_ip(local_ip, hosts_avail) | |
# assert src_id != dst_id, '({}, {}), ({}, {})'.format(remote_ip, src_id, local_ip, dst_id) | |
# bw_matrix_ref[0][src_id][dst_id] = bw | |
def emit_pair(client, server, port, duration): | |
server_log = '/tmp/start_flow_server_{}_{}_{}.txt'.format(client, server, port) | |
client_log = '/tmp/start_flow_client_{}_{}_{}.txt'.format(client, server, port) | |
cmd_s = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits > {} &'.format(port, server_log) | |
cmd_c = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits -F -D {} {} > {} &'.format(port, duration, server, client_log) | |
return [cmd_s, cmd_c, 'cat {}'.format(server_log), 'cat {}'.format(client_log)] | |
def append_cmds(cmds, host, cmd): | |
if host in cmds: | |
cmds[host].append(cmd) | |
else: | |
cmds[host] = [cmd] | |
def ssh_submit(cmds, ths): | |
for k, v in cmds.items(): | |
cmd_on_host = ';'.join(v + ['wait']) | |
cmd = 'ssh -p 22 -o StrictHostKeyChecking=no {} "{}"'.format(k, cmd_on_host) | |
print(cmd) | |
ths.append(threading.Thread(target=run_task, args=(cmd, ))) | |
def ssh_fetch(cmds): | |
flows = [] | |
for [h, cmd] in cmds: | |
ssh_cmd = 'ssh -p 22 -o StrictHostKeyChecking=no {} "{}"'.format(h, cmd) | |
print(ssh_cmd) | |
flows.append(receiver_task(ssh_cmd)) | |
return flows | |
def main(args): | |
ths_s = [] | |
ths_c = [] | |
servers = hosts_avail[2:] | |
clients = hosts_avail[:2] | |
cmds_s = {} | |
cmds_c = {} | |
fetch_cmds_s = [] | |
base_port = 18000 | |
def emit_flow(client, server): | |
nonlocal base_port | |
ret = emit_pair(client, server, base_port, args.duration) | |
append_cmds(cmds_s, server, ret[0]) | |
append_cmds(cmds_c, client, ret[1]) | |
fetch_cmds_s.append([server, ret[2]]) | |
base_port += 1 | |
emit_flow(clients[0], servers[0]); | |
for i in range(0, args.num_competitors): | |
emit_flow(clients[1], servers[1]); | |
ssh_submit(cmds_s, ths_s) | |
ssh_submit(cmds_c, ths_c) | |
for th in ths_s: | |
th.start() | |
time.sleep(5) | |
for th in ths_c: | |
th.start() | |
for th in ths_s + ths_c: | |
th.join() | |
flows = ssh_fetch(fetch_cmds_s) | |
print(flows) | |
def add_args(parser): | |
parser.add_argument('-n', '--num-competitors', type=int, default=1, help='specify the number of competitor flows') | |
parser.add_argument('-D', '--duration', type=int, default=10, help='flow duration (in seconds)') | |
# parse args | |
parser = argparse.ArgumentParser(description="Bandwidth allocation test.", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
add_args(parser) | |
args = parser.parse_args() | |
main(args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
import subprocess | |
import threading | |
import shlex | |
import time | |
hosts = [ | |
'10.0.11.2', | |
'10.0.13.2', | |
'10.0.15.3', | |
'10.0.17.2', | |
'10.0.22.2', | |
'10.0.24.2', | |
'10.0.26.2', | |
'10.0.28.2' | |
] | |
hosts_mask = [0,0,0,0,1,1,1,1] | |
#hosts_mask = [1,1,1,1,0,0,0,0] | |
def get_avail_by_mask(hosts, mask): | |
return list(zip(*filter(lambda x: x[0] != 0, zip(mask, hosts)))[1]) | |
hosts_avail = get_avail_by_mask(hosts, hosts_mask) | |
ip_to_id_cnt = 0 | |
ip_to_id = {} | |
for h in hosts_avail: | |
ip_to_id[h] = ip_to_id_cnt | |
ip_to_id_cnt += 1 | |
def get_id_from_ip(ip, hosts): | |
for i, h in enumerate(hosts): | |
if h == ip: return i | |
assert False, "no valid id found for ip: {}".format(ip) | |
n_worker = len(hosts_avail) | |
bw_matrix = [[0 for i in range(n_worker)] for i in range(n_worker)] | |
def run_task(cmd): | |
#cmd_snip = shlex.split(cmd + " i am " + str(tid)) | |
cmd_snip = shlex.split(cmd) | |
p = subprocess.Popen(cmd_snip, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = p.communicate() | |
#print 'out:', out | |
#print 'err:', err | |
#print 'rc:', p.returncode | |
return out, err | |
''' | |
--------------------------------------------------------------------------------------- | |
RDMA_Write BW Test | |
Dual-port : OFF Device : mlx5_0 | |
Number of qps : 1 Transport type : IB | |
Connection type : RC Using SRQ : OFF | |
CQ Moderation : 100 | |
Mtu : 1024[B] | |
Link type : Ethernet | |
GID index : 3 | |
Max inline data : 0[B] | |
rdma_cm QPs : ON | |
Data ex. method : rdma_cm | |
--------------------------------------------------------------------------------------- | |
Waiting for client rdma_cm QP to connect | |
Please run the same command with the IB/RoCE interface IP | |
--------------------------------------------------------------------------------------- | |
local address: LID 0000 QPN 0x2e5e PSN 0x9078af | |
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:22:02 | |
remote address: LID 0000 QPN 0x036e PSN 0x472771 | |
GID: 00:00:00:00:00:00:00:00:00:00:255:255:10:00:24:02 | |
--------------------------------------------------------------------------------------- | |
#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps] | |
65536 305338 0.00 26.69 0.050913 | |
--------------------------------------------------------------------------------------- | |
''' | |
def parse_ib_write_output(text): | |
lines = text.splitlines() | |
def extract_ipv4(line): | |
temp = line[-11:] | |
return '.'.join(map(lambda x: str(int(x)), temp.split(':'))) | |
for i, line in enumerate(lines): | |
if line.startswith(' local'): | |
local_ip = extract_ipv4(lines[i + 1]) | |
if line.startswith(' remote'): | |
remote_ip = extract_ipv4(lines[i + 1]) | |
if line.startswith(' #bytes'): | |
bw_line = lines[i + 1] | |
words = filter(lambda x: len(x) > 0, bw_line.split(' ')) | |
bw = words[3] | |
#print 'local_ip={}, remote_ip={}, bw={}'.format(local_ip, remote_ip, bw) | |
return local_ip, remote_ip, bw | |
def receiver_task(cmd, bw_matrix_ref): | |
out, err = run_task(cmd) | |
local_ip, remote_ip, bw = parse_ib_write_output(out) | |
src_id = get_id_from_ip(remote_ip, hosts_avail) | |
dst_id = get_id_from_ip(local_ip, hosts_avail) | |
assert src_id != dst_id, '({}, {}), ({}, {})'.format(remote_ip, src_id, local_ip, dst_id) | |
bw_matrix_ref[0][src_id][dst_id] = bw | |
def emit_pair(client, server, port): | |
server_log = '/tmp/start_flow_server_{}_{}_{}.txt'.format(client, server, port) | |
client_log = '/tmp/start_flow_client_{}_{}_{}.txt'.format(client, server, port) | |
cmd_s = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits > {} &'.format(port, server_log) | |
cmd_c = 'ib_write_bw -d mlx5_0 -R -p {} --report_gbits -F -D 10 {} > {} &'.format(port, server, client_log) | |
return [cmd_s, cmd_c, 'cat {}'.format(server_log), 'cat {}'.format(client_log)] | |
def append_cmds(cmds, host, cmd): | |
if cmds.has_key(host): | |
cmds[host].append(cmd) | |
else: | |
cmds[host] = [cmd] | |
def ssh_submit(cmds, ths): | |
for k, v in cmds.items(): | |
cmd_on_host = ';'.join(v + ['wait']) | |
cmd = 'ssh -o StrictHostKeyChecking=no {} "{}"'.format(k, cmd_on_host) | |
print cmd | |
ths.append(threading.Thread(target=run_task, args=(cmd, ))) | |
def ssh_fetch(cmds): | |
for [h, cmd] in cmds: | |
ssh_cmd = 'ssh -o StrictHostKeyChecking=no {} "{}"'.format(h, cmd) | |
print ssh_cmd | |
receiver_task(ssh_cmd, [bw_matrix]) | |
def print_bw_matrix(title, bw_mat): | |
print title | |
matrix = [] | |
matrix.append(['src\dst'] + hosts_avail) | |
for i in range(len(bw_mat)): | |
matrix.append([hosts_avail[i]] + bw_mat[i]) | |
s = [[str(e) for e in row] for row in matrix] | |
lens = [max(map(len, col)) for col in zip(*s)] | |
fmt = '\t'.join('{{:{}}}'.format(x) for x in lens) | |
table = [fmt.format(*row) for row in s] | |
print '\n'.join(table) | |
def main(): | |
ths_s = [] | |
ths_c = [] | |
servers = hosts_avail[0:4] | |
clients = hosts_avail[0:4] | |
cmds_s = {} | |
cmds_c = {} | |
fetch_cmds_s = [] | |
base_port = 18000 | |
for client in clients: | |
for server in servers: | |
if client == server: continue | |
ret = emit_pair(client, server, base_port) | |
append_cmds(cmds_s, server, ret[0]) | |
append_cmds(cmds_c, client, ret[1]) | |
fetch_cmds_s.append([server, ret[2]]) | |
base_port += 1 | |
ssh_submit(cmds_s, ths_s) | |
ssh_submit(cmds_c, ths_c) | |
for th in ths_s: | |
th.start() | |
time.sleep(1) | |
for th in ths_c: | |
th.start() | |
for th in ths_s + ths_c: | |
th.join() | |
ssh_fetch(fetch_cmds_s) | |
print_bw_matrix('Bandwidth Matrix', bw_matrix) | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
One of the problems I encountered and diagnosed is about DCQCN fairness. DCQCN algorithm is implemented in the NIC firmware. Different firmware versions can cause such an issue which is hard to find and locate.