Skip to content

Instantly share code, notes, and snippets.

@jballanc
Last active August 29, 2015 14:04
Show Gist options
  • Save jballanc/57bc71e8b5e9e07d1adc to your computer and use it in GitHub Desktop.
Save jballanc/57bc71e8b5e9e07d1adc to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
#
# Copyright (C) 2014 Glencoe Software, Inc.
# All rights reserved.
import time
import tempfile
import os
import omero.gateway
HOST = "localhost"
PORT = 4064
USERNAME = "root"
PASSWORD = "omero"
conn = omero.gateway.BlitzGateway(USERNAME, PASSWORD, host=HOST, port=PORT)
if not conn.connect():
print("Problem connecting to OMERO server %s:%s" % (HOST, PORT))
queryService = conn.getQueryService()
pixelsService = conn.getPixelsService()
updateService = conn.getUpdateService()
def addDummyImage():
"""
Add a single pixel image just so we can attach annotations...
"""
pixelQuery = "from PixelsType as p where p.value='int8'"
pixelType = queryService.findByQuery(pixelQuery, None)
imgId = pixelsService.createImage(1, 1, 1, 1, range(1), pixelType,
"test_image", None, conn.SERVICE_OPTS)
return imgId
def tagImage(imgId, tagName):
"""
Attach a tag annotation to an image
"""
image = conn.getObject("Image", imgId)
tag = omero.gateway.TagAnnotationWrapper()
tag.setValue(tagName)
image.linkAnnotation(tag)
return tag
def attachDummyFileToImage(imgId, fileSizeInMb):
"""
Generates a file of set size and attaches it to an image
"""
dummyStr = "This is only a test... "
tmpPath = os.path.join(tempfile.gettempdir(), "tmp.txt")
tmp = open(tmpPath, "w")
tmp.writelines(["".join([dummyStr for _ in
range(1024 * 1024 / len(dummyStr))])
for _ in range(fileSizeInMb)])
fileAnn = conn.createFileAnnfromLocalFile(tmpPath, mimetype="text/plain")
image = conn.getObject("Image", imgId)
image.linkAnnotation(fileAnn)
os.remove(tmpPath)
imgId1 = addDummyImage()
tagImage(imgId1, "testing1")
attachDummyFileToImage(imgId1, 510)
tagImage(imgId1, "testing2")
imgId2 = addDummyImage()
tagImage(imgId2, "testing2")
while True:
time.sleep(10) # Stupid-simple wait for indexer to catch up...
testing2 = conn.searchObjects(["Image"], "testing2")
if imgId2.getValue() in [x.getId() for x in testing2]:
break
testing1 = conn.searchObjects(["Image"], "testing1")
testing2 = conn.searchObjects(["Image"], "testing2")
if (imgId1.getValue() in [x.getId() for x in testing1] and
imgId2.getValue() in [x.getId() for x in testing2] and
imgId1.getValue() not in [x.getId() for x in testing2]):
print("Search index corrupted for image #%i" % imgId1.getValue())
else:
print("Problem corrupting index for image #%i" % imgId1.getValue())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment