-
-
Save joshmoore/830c676d51727ae5c839 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (C) 2014 Glencoe Software, Inc. | |
# All rights reserved. | |
import time | |
import tempfile | |
import os | |
import omero.gateway | |
import random | |
import string | |
from omero.rtypes import rstring | |
from omero.rtypes import unwrap | |
HOST = "localhost" | |
PORT = 4064 | |
USERNAME = "root" | |
PASSWORD = "ome" | |
RANDOM_LENGTH = 10 | |
class Fixture(object): | |
def __init__(self, | |
username=USERNAME, password=PASSWORD, | |
host=HOST, port=PORT): | |
self.conn = omero.gateway.BlitzGateway(username, password, host=host, port=port) | |
if not self.conn.connect(): | |
print("Problem connecting to OMERO server %s:%s" % (host, port)) | |
self.queryService = self.conn.getQueryService() | |
self.pixelsService = self.conn.getPixelsService() | |
self.updateService = self.conn.getUpdateService() | |
def addDummyImage(self): | |
""" | |
Add a single pixel image just so we can attach annotations... | |
""" | |
pixelQuery = "from PixelsType as p where p.value='int8'" | |
pixelType = self.queryService.findByQuery(pixelQuery, None) | |
imgId = self.pixelsService.createImage(1, 1, 1, 1, range(1), pixelType, | |
"test_image", None, self.conn.SERVICE_OPTS) | |
return imgId | |
def tagImage(self, imgId, tagName): | |
""" | |
Attach a tag annotation to an image | |
""" | |
image = self.conn.getObject("Image", imgId) | |
tag = omero.gateway.TagAnnotationWrapper() | |
tag.setValue(tagName) | |
image.linkAnnotation(tag) | |
return tag | |
def attachDummyFileToImage(self, imgId, fileSizeInMb): | |
""" | |
Generates a file of set size and attaches it to an image | |
""" | |
dummyStr = "This is only a test... " | |
tmpPath = os.path.join(tempfile.gettempdir(), "tmp.txt") | |
tmp = open(tmpPath, "w") | |
tmp.writelines(["".join([dummyStr for _ in | |
range(1024 * 1024 / len(dummyStr))]) | |
for _ in range(fileSizeInMb)]) | |
fileAnn = self.conn.createFileAnnfromLocalFile(tmpPath, mimetype="text/plain") | |
image = self.conn.getObject("Image", imgId) | |
image.linkAnnotation(fileAnn) | |
os.remove(tmpPath) | |
def get_random_word(self, wordLen=RANDOM_LENGTH): | |
word = '' | |
for i in range(wordLen): | |
word += random.choice(string.ascii_letters) | |
return word | |
def found(self, imgId, text): | |
imgId = unwrap(imgId) | |
results = self.conn.searchObjects(["Image"], text) | |
return imgId in [x.getId() for x in results] | |
def desc_test(self): | |
before = self.get_random_word() | |
after = self.get_random_word() | |
imgId = self.addDummyImage() | |
img = self.queryService.get("Image", imgId.val) | |
img.description = rstring(before) | |
img = self.updateService.saveAndReturnObject(img) | |
time.sleep(10) | |
assert self.found(imgId, before) | |
self.attachDummyFileToImage(imgId, 510) | |
img.description = rstring(before + " " + after) | |
img = self.updateService.saveAndReturnObject(img) | |
time.sleep(10) | |
assert self.found(imgId, after) | |
def tags_test(self): | |
before = self.get_random_word() | |
after = self.get_random_word() | |
imgId = self.addDummyImage() | |
self.tagImage(imgId, before) | |
time.sleep(10) | |
assert self.found(imgId, before) | |
self.attachDummyFileToImage(imgId, 510) | |
self.tagImage(imgId, after) | |
time.sleep(10) | |
assert self.found(imgId, after) | |
def test(self): | |
count = 0 | |
for img in self.conn.getObjects("Image"): | |
if not count: | |
count += 1 | |
print "Image:", img.getId(), | |
missed = 0 | |
missed += self.find_string(img, "name", img.getName()) | |
missed += self.find_string(img, "description", img.getDescription()) | |
tags = self.load_tags(img) | |
missed += self.find_string(img, "annotation", tags) | |
if missed == 0: | |
print "ok" | |
def load_tags(self, img): | |
tags = self.queryService.projection(( | |
"select t.textValue from ImageAnnotationLink l join l.child t " | |
"join l.parent as i where i.id = :id" | |
), omero.sys.ParametersI().addId(img.getId())) | |
parts = [] | |
for x in unwrap(tags): | |
if x: | |
parts.append(x[0]) | |
return parts | |
def find_string(self, img, field, text): | |
if isinstance(text, str): | |
parts = text.split(" ") | |
parts = [x for x in parts if x.strip()] | |
else: | |
parts = text | |
found = 0 | |
for part in parts: | |
if self.found(img.getId(), "%s:%s" % (field, part)): | |
found += 1 | |
results = "%s of %s" % (found, len(parts)) | |
if found != len(parts): | |
print "%s: %s" % (field, results), | |
return len(parts) - found | |
else: | |
return 0 | |
def full_test(self): | |
imgId1 = self.addDummyImage() | |
self.tagImage(imgId1, "testing1") | |
self.attachDummyFileToImage(imgId1, 510) | |
self.tagImage(imgId1, "testing2") | |
imgId2 = self.addDummyImage() | |
self.tagImage(imgId2, "testing2") | |
for x in range(10): | |
time.sleep(10) # Stupid-simple wait for indexer to catch up... | |
if self.found(imgId2, "testing2"): | |
break | |
if (self.found(imgId1, "testing1") | |
and self.found(imgId2, "testing2") | |
and not self.found(imgId1, "testing2")): | |
print("Search index corrupted for image #%i" % imgId1.getValue()) | |
else: | |
print("Problem corrupting index for image #%i" % imgId1.getValue()) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--desc", action="store_true") | |
parser.add_argument("--full", action="store_true") | |
parser.add_argument("--tags", action="store_true") | |
parser.add_argument("--test", action="store_true") | |
ns = parser.parse_args() | |
f = Fixture() | |
if ns.desc: | |
f.desc_test() | |
elif ns.test: | |
f.test() | |
elif ns.tags: | |
f.tags_test() | |
else: | |
f.full_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment