Last active
November 10, 2016 13:17
-
-
Save vincentbernat/952e0ada1333f0e0a69f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pytest | |
import cs | |
import hashlib | |
import time | |
import socket | |
import io | |
from paramiko.client import SSHClient, AutoAddPolicy | |
from paramiko.rsakey import RSAKey | |
# Can be run with: | |
# py.test-3 | |
# py.test-3 --maxfail=5 -v | |
# Zone to tests | |
zones = ['ch-dk-2'] | |
# Distributions to test | |
distributions = ['Linux Debian 7 64-bit', 'Linux Debian 8 64-bit'] | |
#distributions = ['Linux Debian 8 64-bit'] | |
# Disk size to tests | |
sizes = [10, 50, 100, 200] | |
#sizes = [50] | |
# Service offerings to test | |
serviceofferings = ['Small', 'Medium'] | |
#serviceofferings = ['Small'] | |
# Big files to grab | |
bigfiles = ['http://ping.online.net/1000Mo.dat'] | |
@pytest.fixture(scope='module') | |
def cloudstack(): | |
return cs.CloudStack(**cs.read_config()) | |
@pytest.yield_fixture | |
def sshclient(): | |
with SSHClient() as client: | |
client.set_missing_host_key_policy(AutoAddPolicy()) | |
yield client | |
@pytest.fixture(scope='module', params=bigfiles) | |
def bigfile(request): | |
return request.param | |
@pytest.fixture(scope='module', params=zones) | |
def zoneid(request, cloudstack): | |
return cloudstack.listZones(name=request.param)['zone'][0]['id'] | |
@pytest.fixture(scope='module', params=distributions) | |
def distribution(request): | |
return request.param | |
@pytest.fixture(scope='module', params=sizes) | |
def size(request): | |
return request.param | |
@pytest.fixture(scope='module', params=serviceofferings) | |
def serviceofferingid(request, cloudstack): | |
return cloudstack.listServiceOfferings( | |
name=request.param)['serviceoffering'][0]['id'] | |
@pytest.fixture(scope='module') | |
def templateid(cloudstack, zoneid, distribution, size): | |
templates = cloudstack.listTemplates(templatefilter='featured', | |
zoneid=zoneid)['template'] | |
templates = [t for t in templates | |
if t['name'] == distribution and | |
int(t['size'] / 1024 / 1024 / 1024) == size] | |
return templates[0]['id'] | |
@pytest.yield_fixture(scope='module') | |
def securitygroupid(cloudstack): | |
name = "py.test opencloud #{}".format(os.getpid()) | |
g = cloudstack.createSecurityGroup(name=name)['securitygroup'] | |
cloudstack.authorizeSecurityGroupIngress(icmptype=8, | |
icmpcode=0, | |
protocol="ICMP", | |
cidrlist="0.0.0.0/0", | |
securitygroupid=g['id']) | |
cloudstack.authorizeSecurityGroupIngress(protocol="TCP", | |
cidrlist="0.0.0.0/0", | |
startport=22, | |
endport=22, | |
securitygroupid=g['id']) | |
yield g['id'] | |
# Cannot do that, machine has to be purged... | |
# cloudstack.deleteSecurityGroup(id=g['id']) | |
@pytest.yield_fixture(scope='module') | |
def keypair(cloudstack): | |
name = "py.test opencloud #{}".format(os.getpid()) | |
kp = cloudstack.createSSHKeyPair(name=name)['keypair'] | |
yield kp # has name, privatekey and fingerprint | |
cloudstack.deleteSSHKeyPair(name=name) | |
@pytest.yield_fixture(scope='module') | |
def vm(cloudstack, keypair, serviceofferingid, securitygroupid, | |
templateid, zoneid): | |
m = hashlib.md5() | |
m.update(str(serviceofferingid).encode('ascii')) | |
m.update(str(templateid).encode('ascii')) | |
m.update(str(zoneid).encode('ascii')) | |
m.update(str(cloudstack).encode('ascii')) | |
v = cloudstack.deployVirtualMachine( | |
serviceofferingid=serviceofferingid, | |
templateid=templateid, | |
zoneid=zoneid, | |
displayname="py.test opencloud #{} {}".format( | |
os.getpid(), | |
m.hexdigest()), | |
securitygroupids=[securitygroupid], | |
keypair=keypair['name'], | |
name="pytest-{}-{}".format(os.getpid(), | |
m.hexdigest())) | |
yield v | |
cloudstack.destroyVirtualMachine(id=v['id']) | |
@pytest.fixture(scope='module') | |
def deployedvm(cloudstack, vm): | |
now = time.time() | |
while True: | |
j = cloudstack.queryAsyncJobResult(jobid=vm['jobid']) | |
if j['jobstatus'] != 0: | |
if j['jobresultcode'] != 0 or j['jobstatus'] != 1: | |
raise RuntimeError( | |
'VM was not spawned successfully: {}'.format(j)) | |
if 'jobresult' not in j: | |
raise RuntimeError( | |
'No result after spawning the VM: {}'.format(j)) | |
v = j['jobresult']['virtualmachine'] | |
v['ipaddress'] = v['nic'][0]['ipaddress'] | |
time.sleep(2) # Let the VM settle | |
return v | |
time.sleep(1) | |
if time.time() - now > 60: | |
raise RuntimeError( | |
'Unable to deploy VM due to timeout: {}'.format(j)) | |
@pytest.fixture(scope='module') | |
def runningvm(deployedvm): | |
# We have to wait for the VM to become ready | |
now = time.time() | |
while True: | |
try: | |
socket.create_connection((deployedvm['ipaddress'], 22), timeout=10) | |
except socket.error: | |
if time.time() - now < 120: | |
continue | |
raise | |
break | |
return deployedvm | |
@pytest.yield_fixture(scope='module') | |
def sshvm(runningvm): | |
with SSHClient() as client: | |
client.set_missing_host_key_policy(AutoAddPolicy()) | |
client.connect(runningvm['ipaddress'], | |
username="root", | |
timeout=10, | |
password=runningvm['password'], | |
allow_agent=False, | |
look_for_keys=False) | |
yield client | |
def test_connect_with_key(runningvm, keypair, sshclient): | |
sshclient.connect(runningvm['ipaddress'], | |
username="root", | |
timeout=10, | |
pkey=RSAKey.from_private_key( | |
io.StringIO(keypair['privatekey'])), | |
allow_agent=False, | |
look_for_keys=False) | |
stdin, stdout, stderr = sshclient.exec_command('echo hello') | |
stdin.close() | |
assert stdout.read() == b"hello\n" | |
assert stderr.read() == b"" | |
def test_connect_with_password(sshvm): | |
stdin, stdout, stderr = sshvm.exec_command('echo hello') | |
stdin.close() | |
assert stdout.read() == b"hello\n" | |
assert stderr.read() == b"" | |
def test_machine_name(sshvm): | |
stdin, stdout, stderr = sshvm.exec_command('uname -n') | |
stdin.close() | |
name = "pytest-{}".format(os.getpid()).encode('ascii') | |
assert stdout.read()[:len(name)] == name | |
assert stderr.read() == b"" | |
def test_disk_size(sshvm, size): | |
stdin, stdout, stderr = sshvm.exec_command( | |
"df --block-size=1G / | tail -1 | awk '{print $2}'") | |
stdin.close() | |
assert stderr.read() == b"" | |
realsize = int(stdout.read().strip().decode('ascii')) | |
assert abs(realsize - size) <= size * 0.11 | |
def test_network_speed(sshvm, bigfile): | |
# Install curl | |
stdin, stdout, stderr = sshvm.exec_command( | |
'apt-get -qq update && apt-get install -qyy curl') | |
stdin.close() | |
stdout.read() | |
stderr.read() | |
# Execute it | |
now = time.time() | |
stdin, stdout, stderr = sshvm.exec_command( | |
"curl -sLf -w '%{{size_download}}' -o /dev/null {}".format(bigfile)) | |
stdin.close() | |
assert stderr.read() == b"" | |
size = int(stdout.read().decode('ascii')) | |
assert size > 0 | |
speed = size / (time.time() - now) | |
assert speed > 1024 * 1024 * 10 # 10 Mbytes/s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment