Last active
July 12, 2017 02:23
-
-
Save kshailen/92da6fff8c6e678a00b2d69ca74f6395 to your computer and use it in GitHub Desktop.
Pythn Program to put key value pair in etcd cluster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Suppose both files are presenen in functionalTest directory. There is following way to run context on this. | |
#Method 1 | |
[root@marx-vagrant-setup coreos-vagrant]# run-contexts functionalTest | |
10 | |
..... | |
---------------------------------------------------------------------- | |
PASSED! | |
2 contexts, 3 assertions | |
(0.2 seconds) | |
[root@marx-vagrant-setup coreos-vagrant]# | |
#Method 2: | |
[root@marx-vagrant-setup functionalTest]# run-contexts test_Putting_keys.py | |
. | |
---------------------------------------------------------------------- | |
PASSED! | |
1 context, 1 assertion | |
(0.1 seconds) | |
[root@marx-vagrant-setup functionalTest]# run-contexts test_getting_keyValue.py | |
.. | |
---------------------------------------------------------------------- | |
PASSED! | |
1 context, 2 assertions | |
(0.1 seconds) | |
[root@marx-vagrant-setup TEST]# | |
#Another Way: | |
[root@marx-vagrant-setup TEST_TTL]# run-contexts test_PutKeyTTL.py ; sleep 5 ; run-contexts test_GetKeyTTL.py | |
.. | |
---------------------------------------------------------------------- | |
PASSED! | |
1 context, 2 assertions | |
(0.1 seconds) | |
... | |
---------------------------------------------------------------------- | |
PASSED! | |
1 context, 3 assertions | |
(0.1 seconds) | |
[root@marx-vagrant-setup TEST_TTL]# | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import subprocess | |
import json | |
class When_getting_etcd_keys_TTL: | |
"""docstring for When_putting_getting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "USINGTTL" | |
self.TTL = 10 | |
def when_getting_keys_etcd_cluster(self): | |
self.req = self.base_url + self.key | |
self.resp = requests.get(self.req) | |
def it_should_be_twoHunderd(self): | |
assert self.resp.status_code == 200 | |
def value_should_be_YES(self): | |
self.retMessage = json.loads(str(self.resp.content, 'utf-8'))["node"]['value'] | |
assert self.retMessage == "YES" | |
def TTL_should_be_less(self): | |
self.ttl = json.loads(str(self.resp.content, 'utf-8'))["node"]['ttl'] | |
assert self.ttl <= self.TTL |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import json | |
class When_getting_etcd_keys: | |
"""docstring for When_putting_getting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "message" | |
self.val = "Hello" | |
def when_getting_keys_etcd_cluster(self): | |
self.req = self.base_url + self.key | |
self.resp = requests.get(self.req) | |
def it_should_be_twoHunderd(self): | |
assert self.resp.status_code == 200 | |
def value_should_be_HW(self): | |
self.retMessage = json.loads(str(self.resp.content, 'utf-8'))["node"]["value"] | |
#self.retMessage = json.loads(self.resp.content)["node"]['value'] | |
assert self.retMessage == "Hello" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import subprocess | |
import json | |
class When_putting_etcd_keys: | |
"""docstring for When_putting_getting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "DMessage" | |
self.val = "GettingDeleted" | |
def when_putting_keys_etcd_cluster(self): | |
self.url = self.base_url + self.key + " -d value=" + self.val | |
self.status, self.output = subprocess.getstatusoutput("/usr/bin/curl -X PUT %s " % self.url) | |
def it_should_be_zero(self): | |
assert self.status == 0 | |
class When_Deleting_etcd_keys: | |
"""docstring for When_putting_getting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "DMessage" | |
def When_removing_keys(self): | |
self.Delurl = self.base_url + self.key | |
self.statusD, self.outputD = subprocess.getstatusoutput("/usr/bin/curl -X DELETE %s " % self.Delurl) | |
def it_should_be_DZero(self): | |
assert self.statusD == 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import json | |
import subprocess | |
class When_putting_getting_deleting_etcd_keys: | |
"""docstring for When_putting_getting_deleting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://" + os.environ['COREOS_PRIVATE_IPV4'] + ":2379/v2/keys/" | |
self.key = "message" | |
self.val = "Hello" | |
print(" base url : " + self.base_url) | |
def when_putting_getting_and_deleting_keys_etcd_cluster(self): | |
self.url = self.base_url + self.key + " -d value=" + self.val | |
self.status, self.output = subprocess.getstatusoutput("/usr/bin/curl -X PUT %s " % self.url) | |
self.req = self.base_url + self.key | |
self.resp = requests.get(self.req) | |
self.Delurl = self.base_url + self.key | |
self.statusD, self.outputD = subprocess.getstatusoutput("/usr/bin/curl -X DELETE %s " % self.Delurl) | |
"""asseting return code for put""" | |
def it_should_be_zero(self): | |
assert self.status == 0 | |
"""asseting response code for get""" | |
def it_should_be_twoHunderd(self): | |
assert self.resp.status_code == 200 | |
"""asseting returned value of key=message""" | |
def value_should_be_HW(self): | |
self.retMessage = json.loads(str(self.resp.content, 'utf-8'))["node"]["value"] | |
assert self.retMessage == "Hello" | |
"""asseting return code for delete""" | |
def retcode_should_be_zero(self): | |
assert self.statusD == 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import subprocess | |
import json | |
class When_putGetDel_key_TTL: | |
"""docstring for When_putGetDel_key_TTL""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "USINGTTL" | |
self.val = "YES" | |
self.TTL = 10 | |
def when_putGetDel_keys_TTL(self): | |
self.url = self.base_url + self.key + " -d value=" + self.val + " -d ttl=" + str(self.TTL) | |
self.status, self.output = subprocess.getstatusoutput("/usr/bin/curl -X PUT %s " % self.url) | |
"""Getting key with TTL""" | |
self.req = self.base_url + self.key | |
self.resp = requests.get(self.req) | |
"""Cleanup""" | |
self.Delurl = self.base_url + self.key | |
self.statusD, self.outputD = subprocess.getstatusoutput("/usr/bin/curl -X DELETE %s " % self.Delurl) | |
"""Assertion for put""" | |
def it_should_be_zero(self): | |
assert self.status == 0 | |
"""Assertions for get """ | |
def value_should_be_YES(self): | |
self.retMessage = json.loads(str(self.resp.content, 'utf-8'))["node"]['value'] | |
assert self.retMessage == "YES" | |
def TTL_should_be_less(self): | |
self.ttl = json.loads(str(self.resp.content, 'utf-8'))["node"]['ttl'] | |
assert self.ttl <= self.TTL | |
def it_should_be_twoHunderd(self): | |
assert self.resp.status_code == 200 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import subprocess | |
import json | |
class When_putting_etcd_keys_TTL: | |
"""docstring for When_putting_getting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "USINGTTL" | |
self.val = "YES" | |
self.ttl = 10 | |
def when_putting_keys_etcd_cluster_TTL(self): | |
self.url = self.base_url + self.key + " -d value=" + self.val + " -d ttl=" + str(self.ttl) | |
self.status, self.output = subprocess.getstatusoutput("/usr/bin/curl -X PUT %s " % self.url) | |
def it_should_be_zero(self): | |
assert self.status == 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import subprocess | |
import json | |
class When_putting_getting_etcd_keys: | |
"""docstring for When_putting_getting_etcd_keys""" | |
def __init__(self): | |
self.base_url = "http://172.17.8.101:2379/v2/keys/" | |
self.key = "message" | |
self.val = "Hello" | |
def when_putting_keys_etcd_cluster(self): | |
self.url = self.base_url + self.key + " -d value=" + self.val | |
self.status, self.output = subprocess.getstatusoutput("/usr/bin/curl -X PUT %s " % self.url) | |
def it_should_be_zero(self): | |
assert self.status == 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Context Framework Documentation:
http://contexts.readthedocs.io/en/latest/guide.html#test-discovery
http://contexts.readthedocs.io/en/latest/