Created
February 23, 2023 17:57
-
-
Save will-moore/56f03cd126dcac9981bceeb8e7cdb393 to your computer and use it in GitHub Desktop.
OMERO: Upload files in a directory to create a new Fileset to replace an existing Fileset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import locale | |
import os | |
import platform | |
import sys | |
import omero.clients | |
from omero.cli import cli_login | |
from omero.model import ChecksumAlgorithmI | |
from omero.model import NamedValue | |
from omero.model.enums import ChecksumAlgorithmSHA1160 | |
from omero.rtypes import rstring, rbool | |
from omero_version import omero_version | |
from omero.gateway import BlitzGateway | |
def get_files_for_fileset(fs_path): | |
filepaths = [] | |
for path, subdirs, files in os.walk(fs_path): | |
for name in files: | |
print(os.path.join(path, name)) | |
# If we want to ignore chunks... | |
# if ".z" in name or ".xml" in name: | |
filepaths.append(os.path.join(path, name)) | |
return filepaths | |
def create_fileset(files): | |
"""Create a new Fileset from local files.""" | |
fileset = omero.model.FilesetI() | |
for f in files: | |
entry = omero.model.FilesetEntryI() | |
entry.setClientPath(rstring(f)) | |
fileset.addFilesetEntry(entry) | |
# Fill version info | |
system, node, release, version, machine, processor = platform.uname() | |
client_version_info = [ | |
NamedValue('omero.version', omero_version), | |
NamedValue('os.name', system), | |
NamedValue('os.version', release), | |
NamedValue('os.architecture', machine) | |
] | |
try: | |
client_version_info.append( | |
NamedValue('locale', locale.getdefaultlocale()[0])) | |
except: | |
pass | |
upload = omero.model.UploadJobI() | |
upload.setVersionInfo(client_version_info) | |
fileset.linkJob(upload) | |
return fileset | |
def create_settings(): | |
"""Create ImportSettings and set some values.""" | |
settings = omero.grid.ImportSettings() | |
settings.doThumbnails = rbool(True) | |
settings.noStatsInfo = rbool(False) | |
settings.userSpecifiedTarget = None | |
settings.userSpecifiedName = None | |
settings.userSpecifiedDescription = None | |
settings.userSpecifiedAnnotationList = None | |
settings.userSpecifiedPixels = None | |
settings.checksumAlgorithm = ChecksumAlgorithmI() | |
s = rstring(ChecksumAlgorithmSHA1160) | |
settings.checksumAlgorithm.value = s | |
return settings | |
def upload_files(proc, files, client): | |
"""Upload files to OMERO from local filesystem.""" | |
ret_val = [] | |
for i, fobj in enumerate(files): | |
rfs = proc.getUploader(i) | |
try: | |
with open(fobj, 'rb') as f: | |
print ('Uploading: %s' % fobj) | |
offset = 0 | |
block = [] | |
rfs.write(block, offset, len(block)) # Touch | |
while True: | |
block = f.read(1000 * 1000) | |
if not block: | |
break | |
rfs.write(block, offset, len(block)) | |
offset += len(block) | |
ret_val.append(client.sha1(fobj)) | |
finally: | |
rfs.close() | |
return ret_val | |
def create_upload_fileset(conn, client, fs_path): | |
"""create a Fileset but don't import""" | |
mrepo = client.getManagedRepository() | |
files = get_files_for_fileset(fs_path) | |
assert files, 'No files found: %s' % fs_path | |
fileset = create_fileset(files) | |
settings = create_settings() | |
mrepo = client.getManagedRepository() | |
proc = mrepo.importFileset(fileset, settings) | |
hashes = [] | |
try: | |
hashes = upload_files(proc, files, client) | |
finally: | |
proc.close() | |
if len(hashes) == 0: | |
print("No files uploaded!") | |
# Use first hash to get Fileset... | |
# first File will typically be the path/to/image.zarr/.zattrs | |
return get_fileset_from_hash(conn, hashes[0]) | |
def get_fileset_from_hash(conn, hash): | |
params = omero.sys.ParametersI() | |
params.addString("hash", hash) | |
query = 'select fse '\ | |
'from FilesetEntry as fse '\ | |
'join fse.fileset as fileset '\ | |
'join fetch fse.originalFile as ofile '\ | |
'where ofile.hash=:hash '\ | |
'order by fileset.id' | |
rows = conn.getQueryService().findAllByQuery(query, params, conn.SERVICE_OPTS) | |
# If this file has been imported before, multiple Filesets will be found | |
# return last one (most recent) | |
last_fse = rows[-1] | |
fileset_id = last_fse.fileset.id.val | |
orig_file = last_fse.originalFile | |
print('orig_file', orig_file.id.val) | |
print('orig_file', orig_file.getPath().val) | |
print('orig_file', orig_file.getName().val) | |
return fileset_id, orig_file | |
def main(argv): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--replacefileset', type=int, help=( | |
'Replace this Fileset...')) | |
parser.add_argument('path', nargs='+', help='Files or directories') | |
args = parser.parse_args(argv) | |
with cli_login() as cli: | |
conn = BlitzGateway(client_obj=cli._client) | |
for fs_path in args.path: | |
old_fs = None | |
if args.replacefileset: | |
old_fs = conn.getObject('Fileset', args.replacefileset) | |
if old_fs is None: | |
print ('Fileset id not found: %s' % args.replacefileset) | |
sys.exit(1) | |
print ('Importing: %s' % fs_path) | |
fileset_id, orig_file = create_upload_fileset(conn, cli._client, fs_path) | |
if old_fs is not None: | |
for image in old_fs.copyImages(): | |
print("Updating image", image.name, image.id) | |
img = image._obj | |
img.fileset = omero.model.FilesetI(fileset_id, False) | |
conn.getUpdateService().saveObject(img, conn.SERVICE_OPTS) | |
# Print the HQL updates we need to update each Pixels to new Fileset file | |
fname = orig_file.getName().val | |
fpath = orig_file.getPath().val | |
pid = image.getPixelsId() | |
print(f"""psql -U postgres -d OMERO-server -c "UPDATE pixels SET name = '{fname}', path = '{fpath}' where id = {pid}""") | |
# delete old fileset... | |
print("Deleting Fileset", old_fs.id) | |
conn.deleteObjects("Fileset", [old_fs.id]) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment