Created
March 13, 2015 15:39
-
-
Save paulharter/ce78fa79ac2d6d384941 to your computer and use it in GitHub Desktop.
gateway_view_tests.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import time | |
import requests | |
import json | |
import uuid | |
SYNC_GATEWAY_HOST = "192.168.59.103" | |
SYNC_GATEWAY_PORT = "4984" | |
SYNC_GATEWAY_ADMIN_PORT = "4985" | |
SYNC_GATEWAY_NAME = "sync_gateway" | |
SYNC_GATEWAY_URL = "http://%s:%s/%s" % (SYNC_GATEWAY_HOST, SYNC_GATEWAY_ADMIN_PORT, SYNC_GATEWAY_NAME) | |
VIEW = {"_id": "_design/raw", "views": {"all": {"map": "function(doc) {emit(doc.type, doc);}"}}} | |
HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} | |
class GateWayViewTests(unittest.TestCase): | |
def test_query_a_view(self): | |
""" | |
This fails | |
""" | |
# add a design doc with one view that emits (doc.type, doc) | |
view_id = "_design/raw" | |
url = "%s/%s" % (SYNC_GATEWAY_URL, view_id) | |
rsp = requests.put(url, data=json.dumps(VIEW), headers=HEADERS) | |
self.assertEqual(rsp.status_code, 201) | |
# add a document of type "dog" | |
dog_id = str(uuid.uuid4()) | |
print "new dog id", dog_id | |
url = "%s/%s" % (SYNC_GATEWAY_URL, dog_id) | |
rsp = requests.put(url, data=json.dumps({"_id": dog_id, "type": "dog", "name": "fly"}), headers=HEADERS) | |
self.assertEqual(rsp.status_code, 201) | |
print rsp.content | |
# use the view to get all dogs | |
view_url = "%s/_design/raw/_view/all?key=\"dog\"" % SYNC_GATEWAY_URL | |
rsp = requests.get(view_url, headers={"Accept": "application/json"}) | |
if rsp.status_code == 200: | |
results = rsp.json() | |
print "count", results["total_rows"] | |
for row in results["rows"]: | |
print "indexed dog id", row["id"] | |
self.assertTrue(any(row["id"] == unicode(dog_id) for row in results["rows"])) | |
else: | |
self.fail("couldn't query the view") | |
def test_query_a_view_twice_with_pauses(self): | |
""" | |
This passes | |
""" | |
time.sleep(2) | |
# add a design doc with one view that emits (doc.type, doc) | |
view_id = "_design/raw" | |
url = "%s/%s" % (SYNC_GATEWAY_URL, view_id) | |
rsp = requests.put(url, data=json.dumps(VIEW), headers=HEADERS) | |
self.assertEqual(rsp.status_code, 201) | |
# add a document of type "dog" | |
dog_id = str(uuid.uuid4()) | |
print "new dog id", dog_id | |
url = "%s/%s" % (SYNC_GATEWAY_URL, dog_id) | |
rsp = requests.put(url, data=json.dumps({"_id": dog_id, "type": "dog", "name": "fly"}), headers=HEADERS) | |
self.assertEqual(rsp.status_code, 201) | |
print rsp.content | |
# use the view to get all dogs | |
view_url = "%s/_design/raw/_view/all?key=\"dog\"" % SYNC_GATEWAY_URL | |
rsp = requests.get(view_url, headers={"Accept": "application/json"}) | |
# wait two seconds and repeat the request | |
print "*** PAUSE ***" | |
time.sleep(2) | |
# then re run exactly the same request | |
rsp = requests.get(view_url, headers={"Accept": "application/json"}) | |
# check to see if the new dog is in the list of dogs | |
if rsp.status_code == 200: | |
results = rsp.json() | |
print "count", results["total_rows"] | |
for row in results["rows"]: | |
print "indexed dog id", row["id"] | |
self.assertTrue(any(row["id"] == unicode(dog_id) for row in results["rows"])) | |
else: | |
self.fail("couldn't query the view") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment