Skip to content

Instantly share code, notes, and snippets.

@joshmoore
Forked from jballanc/corrupt_search.py
Last active August 29, 2015 14:04
Show Gist options
  • Save joshmoore/830c676d51727ae5c839 to your computer and use it in GitHub Desktop.
Save joshmoore/830c676d51727ae5c839 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2014 Glencoe Software, Inc.
# All rights reserved.
import time
import tempfile
import os
import omero.gateway
import random
import string
from omero.rtypes import rstring
from omero.rtypes import unwrap
HOST = "localhost"
PORT = 4064
USERNAME = "root"
PASSWORD = "ome"
RANDOM_LENGTH = 10
class Fixture(object):
def __init__(self,
username=USERNAME, password=PASSWORD,
host=HOST, port=PORT):
self.conn = omero.gateway.BlitzGateway(username, password, host=host, port=port)
if not self.conn.connect():
print("Problem connecting to OMERO server %s:%s" % (host, port))
self.queryService = self.conn.getQueryService()
self.pixelsService = self.conn.getPixelsService()
self.updateService = self.conn.getUpdateService()
def addDummyImage(self):
"""
Add a single pixel image just so we can attach annotations...
"""
pixelQuery = "from PixelsType as p where p.value='int8'"
pixelType = self.queryService.findByQuery(pixelQuery, None)
imgId = self.pixelsService.createImage(1, 1, 1, 1, range(1), pixelType,
"test_image", None, self.conn.SERVICE_OPTS)
return imgId
def tagImage(self, imgId, tagName):
"""
Attach a tag annotation to an image
"""
image = self.conn.getObject("Image", imgId)
tag = omero.gateway.TagAnnotationWrapper()
tag.setValue(tagName)
image.linkAnnotation(tag)
return tag
def attachDummyFileToImage(self, imgId, fileSizeInMb):
"""
Generates a file of set size and attaches it to an image
"""
dummyStr = "This is only a test... "
tmpPath = os.path.join(tempfile.gettempdir(), "tmp.txt")
tmp = open(tmpPath, "w")
tmp.writelines(["".join([dummyStr for _ in
range(1024 * 1024 / len(dummyStr))])
for _ in range(fileSizeInMb)])
fileAnn = self.conn.createFileAnnfromLocalFile(tmpPath, mimetype="text/plain")
image = self.conn.getObject("Image", imgId)
image.linkAnnotation(fileAnn)
os.remove(tmpPath)
def get_random_word(self, wordLen=RANDOM_LENGTH):
word = ''
for i in range(wordLen):
word += random.choice(string.ascii_letters)
return word
def found(self, imgId, text):
imgId = unwrap(imgId)
results = self.conn.searchObjects(["Image"], text)
return imgId in [x.getId() for x in results]
def desc_test(self):
before = self.get_random_word()
after = self.get_random_word()
imgId = self.addDummyImage()
img = self.queryService.get("Image", imgId.val)
img.description = rstring(before)
img = self.updateService.saveAndReturnObject(img)
time.sleep(10)
assert self.found(imgId, before)
self.attachDummyFileToImage(imgId, 510)
img.description = rstring(before + " " + after)
img = self.updateService.saveAndReturnObject(img)
time.sleep(10)
assert self.found(imgId, after)
def tags_test(self):
before = self.get_random_word()
after = self.get_random_word()
imgId = self.addDummyImage()
self.tagImage(imgId, before)
time.sleep(10)
assert self.found(imgId, before)
self.attachDummyFileToImage(imgId, 510)
self.tagImage(imgId, after)
time.sleep(10)
assert self.found(imgId, after)
def test(self):
count = 0
for img in self.conn.getObjects("Image"):
if not count:
print
count += 1
print "Image:", img.getId(),
missed = 0
missed += self.find_string(img, "name", img.getName())
missed += self.find_string(img, "description", img.getDescription())
tags = self.load_tags(img)
missed += self.find_string(img, "annotation", tags)
if missed == 0:
print "ok"
def load_tags(self, img):
tags = self.queryService.projection((
"select t.textValue from ImageAnnotationLink l join l.child t "
"join l.parent as i where i.id = :id"
), omero.sys.ParametersI().addId(img.getId()))
parts = []
for x in unwrap(tags):
if x:
parts.append(x[0])
return parts
def find_string(self, img, field, text):
if isinstance(text, str):
parts = text.split(" ")
parts = [x for x in parts if x.strip()]
else:
parts = text
found = 0
for part in parts:
if self.found(img.getId(), "%s:%s" % (field, part)):
found += 1
results = "%s of %s" % (found, len(parts))
if found != len(parts):
print "%s: %s" % (field, results),
return len(parts) - found
else:
return 0
def full_test(self):
imgId1 = self.addDummyImage()
self.tagImage(imgId1, "testing1")
self.attachDummyFileToImage(imgId1, 510)
self.tagImage(imgId1, "testing2")
imgId2 = self.addDummyImage()
self.tagImage(imgId2, "testing2")
for x in range(10):
time.sleep(10) # Stupid-simple wait for indexer to catch up...
if self.found(imgId2, "testing2"):
break
if (self.found(imgId1, "testing1")
and self.found(imgId2, "testing2")
and not self.found(imgId1, "testing2")):
print("Search index corrupted for image #%i" % imgId1.getValue())
else:
print("Problem corrupting index for image #%i" % imgId1.getValue())
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--desc", action="store_true")
parser.add_argument("--full", action="store_true")
parser.add_argument("--tags", action="store_true")
parser.add_argument("--test", action="store_true")
ns = parser.parse_args()
f = Fixture()
if ns.desc:
f.desc_test()
elif ns.test:
f.test()
elif ns.tags:
f.tags_test()
else:
f.full_test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment