Skip to content

Instantly share code, notes, and snippets.

@vincentbernat
Last active November 10, 2016 13:17
Show Gist options
  • Save vincentbernat/952e0ada1333f0e0a69f to your computer and use it in GitHub Desktop.
Save vincentbernat/952e0ada1333f0e0a69f to your computer and use it in GitHub Desktop.
import os
import pytest
import cs
import hashlib
import time
import socket
import io
from paramiko.client import SSHClient, AutoAddPolicy
from paramiko.rsakey import RSAKey
# Can be run with:
# py.test-3
# py.test-3 --maxfail=5 -v
# Zone to tests
zones = ['ch-dk-2']
# Distributions to test
distributions = ['Linux Debian 7 64-bit', 'Linux Debian 8 64-bit']
#distributions = ['Linux Debian 8 64-bit']
# Disk size to tests
sizes = [10, 50, 100, 200]
#sizes = [50]
# Service offerings to test
serviceofferings = ['Small', 'Medium']
#serviceofferings = ['Small']
# Big files to grab
bigfiles = ['http://ping.online.net/1000Mo.dat']
@pytest.fixture(scope='module')
def cloudstack():
return cs.CloudStack(**cs.read_config())
@pytest.yield_fixture
def sshclient():
with SSHClient() as client:
client.set_missing_host_key_policy(AutoAddPolicy())
yield client
@pytest.fixture(scope='module', params=bigfiles)
def bigfile(request):
return request.param
@pytest.fixture(scope='module', params=zones)
def zoneid(request, cloudstack):
return cloudstack.listZones(name=request.param)['zone'][0]['id']
@pytest.fixture(scope='module', params=distributions)
def distribution(request):
return request.param
@pytest.fixture(scope='module', params=sizes)
def size(request):
return request.param
@pytest.fixture(scope='module', params=serviceofferings)
def serviceofferingid(request, cloudstack):
return cloudstack.listServiceOfferings(
name=request.param)['serviceoffering'][0]['id']
@pytest.fixture(scope='module')
def templateid(cloudstack, zoneid, distribution, size):
templates = cloudstack.listTemplates(templatefilter='featured',
zoneid=zoneid)['template']
templates = [t for t in templates
if t['name'] == distribution and
int(t['size'] / 1024 / 1024 / 1024) == size]
return templates[0]['id']
@pytest.yield_fixture(scope='module')
def securitygroupid(cloudstack):
name = "py.test opencloud #{}".format(os.getpid())
g = cloudstack.createSecurityGroup(name=name)['securitygroup']
cloudstack.authorizeSecurityGroupIngress(icmptype=8,
icmpcode=0,
protocol="ICMP",
cidrlist="0.0.0.0/0",
securitygroupid=g['id'])
cloudstack.authorizeSecurityGroupIngress(protocol="TCP",
cidrlist="0.0.0.0/0",
startport=22,
endport=22,
securitygroupid=g['id'])
yield g['id']
# Cannot do that, machine has to be purged...
# cloudstack.deleteSecurityGroup(id=g['id'])
@pytest.yield_fixture(scope='module')
def keypair(cloudstack):
name = "py.test opencloud #{}".format(os.getpid())
kp = cloudstack.createSSHKeyPair(name=name)['keypair']
yield kp # has name, privatekey and fingerprint
cloudstack.deleteSSHKeyPair(name=name)
@pytest.yield_fixture(scope='module')
def vm(cloudstack, keypair, serviceofferingid, securitygroupid,
templateid, zoneid):
m = hashlib.md5()
m.update(str(serviceofferingid).encode('ascii'))
m.update(str(templateid).encode('ascii'))
m.update(str(zoneid).encode('ascii'))
m.update(str(cloudstack).encode('ascii'))
v = cloudstack.deployVirtualMachine(
serviceofferingid=serviceofferingid,
templateid=templateid,
zoneid=zoneid,
displayname="py.test opencloud #{} {}".format(
os.getpid(),
m.hexdigest()),
securitygroupids=[securitygroupid],
keypair=keypair['name'],
name="pytest-{}-{}".format(os.getpid(),
m.hexdigest()))
yield v
cloudstack.destroyVirtualMachine(id=v['id'])
@pytest.fixture(scope='module')
def deployedvm(cloudstack, vm):
now = time.time()
while True:
j = cloudstack.queryAsyncJobResult(jobid=vm['jobid'])
if j['jobstatus'] != 0:
if j['jobresultcode'] != 0 or j['jobstatus'] != 1:
raise RuntimeError(
'VM was not spawned successfully: {}'.format(j))
if 'jobresult' not in j:
raise RuntimeError(
'No result after spawning the VM: {}'.format(j))
v = j['jobresult']['virtualmachine']
v['ipaddress'] = v['nic'][0]['ipaddress']
time.sleep(2) # Let the VM settle
return v
time.sleep(1)
if time.time() - now > 60:
raise RuntimeError(
'Unable to deploy VM due to timeout: {}'.format(j))
@pytest.fixture(scope='module')
def runningvm(deployedvm):
# We have to wait for the VM to become ready
now = time.time()
while True:
try:
socket.create_connection((deployedvm['ipaddress'], 22), timeout=10)
except socket.error:
if time.time() - now < 120:
continue
raise
break
return deployedvm
@pytest.yield_fixture(scope='module')
def sshvm(runningvm):
with SSHClient() as client:
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(runningvm['ipaddress'],
username="root",
timeout=10,
password=runningvm['password'],
allow_agent=False,
look_for_keys=False)
yield client
def test_connect_with_key(runningvm, keypair, sshclient):
sshclient.connect(runningvm['ipaddress'],
username="root",
timeout=10,
pkey=RSAKey.from_private_key(
io.StringIO(keypair['privatekey'])),
allow_agent=False,
look_for_keys=False)
stdin, stdout, stderr = sshclient.exec_command('echo hello')
stdin.close()
assert stdout.read() == b"hello\n"
assert stderr.read() == b""
def test_connect_with_password(sshvm):
stdin, stdout, stderr = sshvm.exec_command('echo hello')
stdin.close()
assert stdout.read() == b"hello\n"
assert stderr.read() == b""
def test_machine_name(sshvm):
stdin, stdout, stderr = sshvm.exec_command('uname -n')
stdin.close()
name = "pytest-{}".format(os.getpid()).encode('ascii')
assert stdout.read()[:len(name)] == name
assert stderr.read() == b""
def test_disk_size(sshvm, size):
stdin, stdout, stderr = sshvm.exec_command(
"df --block-size=1G / | tail -1 | awk '{print $2}'")
stdin.close()
assert stderr.read() == b""
realsize = int(stdout.read().strip().decode('ascii'))
assert abs(realsize - size) <= size * 0.11
def test_network_speed(sshvm, bigfile):
# Install curl
stdin, stdout, stderr = sshvm.exec_command(
'apt-get -qq update && apt-get install -qyy curl')
stdin.close()
stdout.read()
stderr.read()
# Execute it
now = time.time()
stdin, stdout, stderr = sshvm.exec_command(
"curl -sLf -w '%{{size_download}}' -o /dev/null {}".format(bigfile))
stdin.close()
assert stderr.read() == b""
size = int(stdout.read().decode('ascii'))
assert size > 0
speed = size / (time.time() - now)
assert speed > 1024 * 1024 * 10 # 10 Mbytes/s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment