|
#!/usr/bin/env python |
|
# -*- coding: utf-8 -*- |
|
# ----------------------------------------------------------------------------- |
|
# Copyright (C) 2018 University of Dundee. All rights reserved. |
|
# |
|
# This program is free software; you can redistribute it and/or modify |
|
# it under the terms of the GNU General Public License as published by |
|
# the Free Software Foundation; either version 2 of the License, or |
|
# (at your option) any later version. |
|
# This program is distributed in the hope that it will be useful, |
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
# GNU General Public License for more details. |
|
|
|
# You should have received a copy of the GNU General Public License along |
|
# with this program; if not, write to the Free Software Foundation, Inc., |
|
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
|
# |
|
# ------------------------------------------------------------------------------ |
|
|
|
"""This script merges duplicate Tags and moves Images to remaining Tag.""" |
|
# https://www.openmicroscopy.org/community/viewtopic.php?f=4&t=8478&p=19225#p19225 |
|
|
|
|
|
from omero.gateway import BlitzGateway |
|
from omero import ValidationException |
|
from omero.model import TagAnnotationI, ImageAnnotationLinkI, ImageI |
|
from omero.rtypes import rstring |
|
import omero.scripts as scripts |
|
|
|
def merge_tags(conn, params): |
|
"""Merge tags and return a message.""" |
|
tag_text = params['Tag_Name'] |
|
updateService = conn.getUpdateService() |
|
|
|
# Find Tags by Name |
|
tags = conn.getObjects("TagAnnotation", attributes={"textValue": tag_text}) |
|
tags = list(tags) |
|
|
|
if len(tags) < 2: |
|
return "Found fewer than 2 Tags of %s " % tag_text |
|
|
|
# Keep the first tag, delete others, moving links to new tag |
|
to_keep = tags[0] |
|
print 'to keep', to_keep.id, to_keep.textValue |
|
|
|
to_delete = [] |
|
|
|
# Go through all the tags we're deleting... |
|
for tag in tags[1:]: |
|
print "Transfer images from Tag:", tag.id |
|
# Create new links between 'to_keep' and tagged objects |
|
# Just handle images for now... |
|
for link in conn.getAnnotationLinks("Image", ann_ids=[tag.id]): |
|
new_link = ImageAnnotationLinkI() |
|
new_link.child = TagAnnotationI(to_keep.id, False) |
|
new_link.parent = ImageI(link.parent.id, False) |
|
try: |
|
updateService.saveObject(new_link) |
|
except ValidationException: |
|
# Ignore failures where link already exists |
|
print " link already exists to image: %s" % link.parent.id |
|
to_delete.append(tag.id) |
|
|
|
# Do the deletion |
|
print "Deleting Tags", to_delete |
|
conn.deleteObjects("Annotation", to_delete) |
|
|
|
return "Deleted %s Tags" % len(to_delete) |
|
|
|
|
|
def run_script(): |
|
"""The main entry point of the script, as called by the client.""" |
|
client = scripts.client( |
|
'Merge_Tags.py', |
|
"""Delete duplicate Tags, moving Images to remaining Tag""", |
|
|
|
scripts.String( |
|
"Tag_Name", optional=False, grouping="1", |
|
description="Name of the Tags you want to merge."), |
|
|
|
authors=["William Moore", "OME Team"], |
|
institutions=["University of Dundee"], |
|
contact="[email protected]", |
|
) |
|
|
|
try: |
|
conn = BlitzGateway(client_obj=client) |
|
|
|
script_params = client.getInputs(unwrap=True) |
|
print script_params |
|
|
|
# call the main script |
|
message = merge_tags(conn, script_params) |
|
|
|
# Return message |
|
client.setOutput("Message", rstring(message)) |
|
|
|
finally: |
|
client.closeSession() |
|
|
|
|
|
if __name__ == "__main__": |
|
run_script() |