Created
January 19, 2012 09:51
-
-
Save bennihepp/1639109 to your computer and use it in GitHub Desktop.
CellProfiler module to execute small python scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''<b>RunScript</b> - an easy way to write scripts for CellProfiler | |
<hr> | |
This module allows you to write small python scripts that are run as part | |
of the CellProfiler pipeline. | |
''' | |
################################# | |
# | |
# Imports from useful Python libraries | |
# | |
################################# | |
import os | |
import itertools | |
import numpy as np | |
import scipy as sp | |
################################# | |
# | |
# Imports from CellProfiler | |
# | |
# The package aliases are the standard ones we use | |
# throughout the code. | |
# | |
################################## | |
import cellprofiler.cpimage as cpi | |
import cellprofiler.cpmodule as cpm | |
import cellprofiler.measurements as cpmeas | |
import cellprofiler.objects as cpo | |
import cellprofiler.settings as cps | |
from cellprofiler.preferences import \ | |
DEFAULT_INPUT_FOLDER_NAME, DEFAULT_OUTPUT_FOLDER_NAME, NO_FOLDER_NAME, \ | |
ABSOLUTE_FOLDER_NAME | |
DIR_ALL = [DEFAULT_INPUT_FOLDER_NAME, DEFAULT_OUTPUT_FOLDER_NAME, | |
NO_FOLDER_NAME, ABSOLUTE_FOLDER_NAME] | |
WC_OBJECTS = 'CellProfiler Objects' | |
WC_FILTERED_OBJECTS = 'Filtered segmentation' | |
WC_UNFILTERED_OBJECTS = 'Unfiltered segmentation' | |
WC_SMALLREMOVED_OBJECTS = 'Segmentation with small objects removed' | |
WD_NO = "No" | |
WD_PDB = "Use pdb" | |
WD_WINGDB = "Use wingdbstub" | |
WD_RPDB2 = "Use rpdb2" | |
WT_FLOAT = "Float" | |
WT_INT = "Integer" | |
WT_LIST = "List" | |
WT_DICT = "Dictionary" | |
WT_NPARRAY = "Numpy float array" | |
WT_FLOATDICT = "Dictionary with float values" | |
WT_STRING = "String" | |
SETTINGS_OFFSET = 5 | |
MT_TYPES = [ | |
cpmeas.COLTYPE_FLOAT, | |
cpmeas.COLTYPE_INTEGER, | |
] | |
class RunScript(cpm.CPModule): | |
module_name = "RunScript" | |
category = "Other" | |
variable_revision_number = 1 | |
def create_settings(self): | |
# add choice for debugging modes | |
self.wants_debug_mode = cps.Choice("Run script in debug mode?", | |
[WD_NO, WD_PDB, | |
WD_WINGDB, WD_RPDB2]) | |
# add containers for groups | |
self.input_image_groups = [] | |
self.input_object_groups = [] | |
self.input_measurement_groups = [] | |
self.input_constant_groups = [] | |
self.output_image_groups = [] | |
self.output_object_groups = [] | |
self.output_measurement_groups = [] | |
# add hidden counts for groups | |
self.input_image_count = cps.HiddenCount( | |
self.input_image_groups, 'Input image count') | |
self.input_object_count = cps.HiddenCount( | |
self.input_object_groups, 'input object count') | |
self.input_measurement_count = cps.HiddenCount( | |
self.input_measurement_groups, 'Input measurement count') | |
self.input_constant_count = cps.HiddenCount( | |
self.input_constant_groups, 'Input constant count') | |
self.output_image_count = cps.HiddenCount( | |
self.output_image_groups, 'Output image count') | |
self.output_object_count = cps.HiddenCount( | |
self.output_object_groups, 'Output object count') | |
self.output_measurement_count = cps.HiddenCount( | |
self.output_measurement_groups, 'Output measurement count') | |
# add buttons for adding inputs and outputs | |
self.add_input_image = cps.DoSomething( | |
"", "Add another input image", | |
self.add_input_image_cb) | |
self.add_input_object = cps.DoSomething( | |
"", "Add another input object", | |
self.add_input_object_cb) | |
self.add_input_measurement = cps.DoSomething( | |
"", "Add another input measurement", | |
self.add_input_measurement_cb) | |
self.add_input_constant = cps.DoSomething( | |
"", "Add another input constant", | |
self.add_input_constant_cb) | |
self.input_output_divider = cps.Divider() | |
self.add_output_image = cps.DoSomething( | |
"", "Add another output image", | |
self.add_output_image_cb) | |
self.add_output_object = cps.DoSomething( | |
"", "Add another output object", | |
self.add_output_object_cb) | |
self.add_output_measurement = cps.DoSomething( | |
"", "Add another output measurement", | |
self.add_output_measurement_cb) | |
# add binary choice for having a text field to enter the script | |
# or load the script from a file | |
self.script_choice = cps.Binary( | |
"Load script from a file?", | |
True, | |
doc="""You can either load the script from """ | |
"""a file or enter it directly into a text box.""" | |
) | |
# add directory input box for loading script | |
self.script_dir = cps.DirectoryPath( | |
"Name of the script file directory", | |
dir_choices=DIR_ALL, | |
allow_metadata=False | |
) | |
# add file input box for loading script | |
self.script_file = cps.FilenameText( | |
"Name of the script file", | |
"None", | |
exts=[("Script file (*.py)", "*.py"), ("Any file (*)", "*")] | |
) | |
# add text field for entering script. | |
# keyword arguments for the text field | |
text_kwargs = {'doc': "Enter the python script to be run", | |
'multiline': True} | |
# check if we are allowed to change the size of the text field | |
if getattr(cps.Text, '__change_size_support__', False): | |
text_kwargs['size'] = (600, 400) | |
self.script_text = cps.Text( | |
"Script to run:", | |
"results = 5", | |
**text_kwargs | |
) | |
self.__settings = ( | |
( | |
self.add_input_image, self.input_image_groups, | |
("image", "py_name", "wants_raw"), | |
("divider", "image", "py_name", "wants_raw", "remover"), | |
self.add_input_image_cb | |
), | |
( | |
self.add_input_object, self.input_object_groups, | |
("objects", "py_name", "objects_choice"), | |
("divider", "objects", "py_name", "objects_choice", "remover"), | |
self.add_input_object_cb | |
), | |
( | |
self.add_input_measurement, self.input_measurement_groups, | |
("wants_image", "use_object_name", "measurement", "py_name"), | |
("divider", "wants_image", "use_object_name", | |
"measurement", "py_name", "remover"), | |
self.add_input_measurement_cb | |
), | |
( | |
self.add_input_constant, self.input_constant_groups, | |
("constant", "type_choice", "py_name"), | |
("divider", "constant", "type_choice", "py_name", "remover"), | |
self.add_input_constant_cb | |
), | |
( | |
self.add_output_image, self.output_image_groups, | |
("image_name", "py_name"), | |
("divider", "image_name", "py_name", "remover"), | |
self.add_output_image_cb | |
), | |
( | |
self.add_output_object, self.output_object_groups, | |
("objects_name", "py_name"), | |
("divider", "objects_name", "py_name", "remover"), | |
self.add_output_object_cb | |
), | |
( | |
self.add_output_measurement, self.output_measurement_groups, | |
("image", "wants_image", "use_object_name", "type_choice", | |
"measurement_category", "measurement_name", "py_name"), | |
("divider", "image", "wants_image", "use_object_name", | |
"type_choice", "measurement_category", "measurement_name", | |
"py_name", "remover"), | |
self.add_output_measurement_cb | |
), | |
) | |
def settings(self): | |
result = [ | |
self.input_image_count, | |
self.input_object_count, | |
self.input_measurement_count, | |
self.input_constant_count, | |
self.output_image_count, | |
self.output_object_count, | |
self.output_measurement_count, | |
] | |
result += [self.wants_debug_mode] | |
result += [self.script_choice] | |
result += [self.script_dir] | |
result += [self.script_file] | |
result += [self.script_text] | |
for group_btn, groups, attr_names, visible_attr_names, add_cb in \ | |
self.__settings: | |
for group in groups: | |
for attr_name in attr_names: | |
result += [getattr(group, attr_name)] | |
return result | |
def visible_settings(self): | |
result = [self.wants_debug_mode] | |
for group_btn, groups, attr_names, visible_attr_names, add_cb in \ | |
self.__settings: | |
for group in groups: | |
skip_use_object_name = False | |
for attr_name in visible_attr_names: | |
if attr_name == 'wants_image' \ | |
and getattr(group, attr_name).value: | |
skip_use_object_name = True | |
if attr_name != 'use_object_name' \ | |
or not skip_use_object_name: | |
result += [getattr(group, attr_name)] | |
#result += group.visible_settings() | |
result += [group_btn] | |
result += [cps.Divider()] | |
result += [self.script_choice] | |
if self.script_choice.value: | |
result += [self.script_dir] | |
result += [self.script_file] | |
else: | |
result += [self.script_text] | |
return result | |
def prepare_settings(self, setting_values): | |
'''Prepare the module to receive the settings | |
setting_values - one string per setting to be initialized | |
Adjust the number of input and output objects to | |
match the number indicated in the settings. | |
''' | |
for count, groups, add_cb in zip( | |
map(lambda x: int(x), setting_values[0:6 + 1]), | |
*itertools.compress(zip(*self.__settings), [0, 1, 0, 0, 1]) | |
): | |
del groups[count:] | |
while len(groups) < count: | |
add_cb() | |
def add_input_image_cb(self): | |
'''Add an image to the input_image_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
group.append('image', cps.ImageNameSubscriber( | |
"Select an input image to use", None, | |
doc="""Select an image you want to use as input in your script""" | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the object-variable in Python", | |
"cp_image_in", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the image. The pixels can""" | |
"""then be accessed in python as <py_name>.pixel_data.""" | |
)) | |
group.append( | |
'wants_raw', | |
cps.Binary("Access raw pixel data?", False, | |
doc="You can choose to access the raw pixel data " \ | |
"instead of the CPImage object, where the pixels "\ | |
"can be accessed as 'image.pixel_data'." | |
)) | |
group.append('remover', | |
cps.RemoveSettingButton( | |
"", "Remove this image", self.input_image_groups, group) | |
) | |
self.input_image_groups.append(group) | |
def add_input_object_cb(self): | |
'''Add an object to the input_object_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
group.append('objects', cps.ObjectNameSubscriber( | |
"Select an input object to use", None, | |
doc="""Select an object you want to use as input in your script""" | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the object-variable in Python", | |
"cp_object_in", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the object""" | |
)) | |
group.append( | |
'objects_choice', | |
cps.Choice('What part of the objects do you want to access?', | |
[WC_OBJECTS, | |
WC_FILTERED_OBJECTS, | |
WC_UNFILTERED_OBJECTS, | |
WC_SMALLREMOVED_OBJECTS] | |
)) | |
#wants_raw = cps.Binary( | |
# "Access raw labels matrix?", False, | |
# doc="You can choose to access the raw unfiltered labels matrix " \ | |
# "instead of the Objects object, where the labels "\ | |
# "matrix can be accessed as 'objects.unedited_segmented'.") | |
#group.append('wants_raw', wants_raw) | |
group.append( | |
"remover", | |
cps.RemoveSettingButton( | |
"", "Remove this object", self.input_object_groups, group) | |
) | |
self.input_object_groups.append(group) | |
def add_input_measurement_cb(self): | |
'''Add a measurement to the input_measurement_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
wants_image_measurement = cps.Binary("Use an image measurement?", True) | |
group.append('wants_image', wants_image_measurement) | |
use_object_name = cps.ObjectNameSubscriber("Object name:") | |
group.append('use_object_name', use_object_name) | |
def object_fn(): | |
if wants_image_measurement.value: | |
return cpmeas.IMAGE | |
else: | |
return use_object_name.value | |
group.append('measurement', cps.Measurement( | |
"Select an input measurement to use", object_fn, | |
doc="""Select an image you want to use as input in your script""" | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the measurement-variable in Python", | |
"cp_measurement_in", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the measurement""" | |
)) | |
group.append("remover", | |
cps.RemoveSettingButton( | |
"", "Remove this measurement", | |
self.input_measurement_groups, group) | |
) | |
self.input_measurement_groups.append(group) | |
def add_input_constant_cb(self): | |
'''Add a constant to the input_constant_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
group.append('constant', cps.Text( | |
"Enter the constant input to use", | |
"1.0", | |
doc="""Enter a constant you want to use as input in your script""" | |
)) | |
group.append('type_choice', | |
cps.Choice("Choose the type of the constant", | |
[WT_FLOAT, | |
WT_INT, | |
WT_LIST, | |
WT_DICT, | |
WT_NPARRAY, | |
WT_FLOATDICT, | |
WT_STRING] | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the constant-variable in Python", | |
"cp_constant_in", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the constant""" | |
)) | |
group.append( | |
"remover", | |
cps.RemoveSettingButton( | |
"", "Remove this constant", self.input_constant_groups, group) | |
) | |
self.input_constant_groups.append(group) | |
def add_output_image_cb(self): | |
'''Add an image to the output_image_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
group.append('image_name', cps.ImageNameProvider( | |
"Output image name:", | |
"OutputImage" | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the image-variable in Python", | |
"cp_image_out", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the image""" | |
)) | |
group.append( | |
"remover", | |
cps.RemoveSettingButton( | |
"", "Remove this image", self.output_image_groups, group) | |
) | |
self.output_image_groups.append(group) | |
def add_output_object_cb(self): | |
'''Add an object to the output_object_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
group.append('objects_name', cps.ObjectNameProvider( | |
"Name the output objects:", | |
"OutputObject" | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the object-variable in Python", | |
"cp_object_out", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the object""" | |
)) | |
group.append( | |
"remover", | |
cps.RemoveSettingButton( | |
"", "Remove this object", self.output_object_groups, group) | |
) | |
self.output_object_groups.append(group) | |
def add_output_measurement_cb(self): | |
'''Add a measurement to output_measurement_groups collection''' | |
group = cps.SettingsGroup() | |
group.append("divider", cps.Divider()) | |
group.append('image', cps.ImageNameSubscriber( | |
"Image to measure", None, | |
doc="""Select an image on which to perform the measurement""" | |
)) | |
group.append('wants_image', cps.Binary( | |
"Create an image measurement?", False | |
)) | |
group.append('use_object_name', cps.ObjectNameSubscriber( | |
"Object name", | |
doc="""Select an object on which to perform the measurement""" | |
)) | |
group.append('measurement_category', cps.Text( | |
"Measurement category:", | |
"RunScript" | |
)) | |
group.append('type_choice', | |
cps.Choice("Choose the type of the measurement", | |
MT_TYPES | |
)) | |
group.append('measurement_name', cps.Text( | |
"Measurement name:", | |
"OutputMeasurement" | |
)) | |
group.append('py_name', cps.Text( | |
"Name for the measurement-variable in Python", | |
"cp_measurement_out", | |
doc="""Select the name of the variable that """ | |
"""can be used to access the measurement""" | |
)) | |
group.append( | |
"remover", | |
cps.RemoveSettingButton( | |
"", "Remove this measurement", | |
self.output_measurement_groups, group) | |
) | |
self.output_measurement_groups.append(group) | |
def get_measurement_columns(self, pipeline): | |
'''Return column definitions for measurements made by this module''' | |
columns = [] | |
for group in self.output_measurement_groups: | |
object_name = (cpmeas.IMAGE if group.wants_image.value | |
else group.use_object_name.value) | |
columns.append(( | |
object_name, | |
'%s_%s_%s' % (group.measurement_category.value, | |
group.measurement_name.value, | |
group.image.value), | |
group.type_choice.value | |
)) | |
return columns | |
def get_categories(self, pipeline, object_name): | |
categories = [] | |
for group in self.output_measurement_groups: | |
group_object_name = (cpmeas.IMAGE if group.wants_image.value | |
else group.use_object_name.value) | |
if object_name == group_object_name: | |
categories.append(group.measurement_category.value) | |
return categories | |
def get_measurements(self, pipeline, object_name, category): | |
measurements = [] | |
for group in self.output_measurement_groups: | |
group_object_name = (cpmeas.IMAGE if group.wants_image.value | |
else group.use_object_name.value) | |
group_category = group.measurement_category.value | |
if object_name == group_object_name \ | |
and category == group_category: | |
measurements.append(group.measurement_name.value) | |
return measurements | |
def get_measurement_images(self, pipeline, object_name, \ | |
category, measurement): | |
images = [] | |
for group in self.output_measurement_groups: | |
group_object_name = (cpmeas.IMAGE if group.wants_image.value | |
else group.use_object_name.value) | |
group_category = group.measurement_category.value | |
group_name = group.measurement_name.value | |
group_image = group.image.value | |
if object_name == group_object_name \ | |
and category == group_category \ | |
and measurement == group_name: | |
images.append(group_image) | |
return images | |
#def get_measurement_objects(self, pipeline, object_name, \ | |
#category, measurement): | |
#objects = [] | |
#for group in self.output_measurement_groups: | |
#group_object_name = (cpmeas.IMAGE if group.wants_image.value | |
#else group.use_object_name.value) | |
#group_category = group.measurement_category.value | |
#group_name = group.measurement_name.value | |
#group_image = group.image.value | |
#if not group.wants_image.value \ | |
#and object_name == group_object_name \ | |
#and category == group_category \ | |
#and measurement == group_name: | |
#objects.append(group_object_name) | |
#return objects | |
# | |
# CellProfiler calls "run" on each image set in your pipeline. | |
# This is where you do the real work. | |
# | |
def run(self, workspace): | |
# prepare namespace for python script | |
global_ns = {} | |
user_ns = {} | |
global_ns['cpi'] = cpi | |
global_ns['cpm'] = cpm | |
global_ns['cpmeas'] = cpmeas | |
global_ns['cpo'] = cpo | |
global_ns['cps'] = cps | |
global_ns['np'] = np | |
global_ns['sp'] = sp | |
# attribute-accessable dictionary for easier use in the script | |
class AttributeDict(dict): | |
__getattr__ = dict.__getitem__ | |
__setattr__ = dict.__setitem__ | |
# add a value to the script namespace | |
def add_value_to_script_namespace(name, value): | |
name_parts = name.split('.') | |
d = user_ns | |
for name_part in name_parts[:-1]: | |
if name_part not in user_ns: | |
d[name_part] = AttributeDict() | |
d = d[name_part] | |
d[name_parts[-1]] = value | |
# add input images to the script namespace | |
for group in self.input_image_groups: | |
image = workspace.image_set.get_image(group.image.value) | |
py_name = group.py_name.value | |
if group.wants_raw.value: | |
image = image.pixel_data | |
add_value_to_script_namespace(py_name, image) | |
# add input objects to the script namespace | |
for group in self.input_object_groups: | |
objects = workspace.object_set.get_objects(group.objects.value) | |
py_name = group.py_name.value | |
if group.objects_choice.value == WC_FILTERED_OBJECTS: | |
objects = objects.segmented | |
elif group.objects_choice.value == WC_UNFILTERED_OBJECTS: | |
objects = objects.unedited_segmented | |
elif group.objects_choice.value == WC_SMALLREMOVED_OBJECTS: | |
objects = objects.small_removed_segmented | |
add_value_to_script_namespace(py_name, objects) | |
# add input measurements to the script namespace | |
for group in self.input_measurement_groups: | |
use_object_name = (cpmeas.IMAGE if group.wants_image.value | |
else group.use_object_name.value) | |
measurement = workspace.measurements.get_current_measurement( | |
use_object_name, group.measurement.value) | |
py_name = group.py_name.value | |
add_value_to_script_namespace(py_name, measurement) | |
# add input constants to the script namespace | |
for group in self.input_constant_groups: | |
constant = group.constant.value | |
type_choice = group.type_choice.value | |
py_name = group.py_name.value | |
if type_choice == WT_FLOAT: | |
constant = float(constant) | |
elif type_choice == WT_INT: | |
constant = int(constant) | |
elif type_choice == WT_LIST: | |
constant = constant.split(',') | |
elif type_choice == WT_DICT: | |
constant = dict( | |
map(lambda x: x.split(':'), constant.split(',')) | |
) | |
elif type_choice == WT_NPARRAY: | |
constant = np.array( | |
map(lambda x: float(x), constant.split(',')) | |
) | |
elif type_choice == WT_FLOATDICT: | |
constant = dict( | |
map(lambda k, v: (k, float(v)), | |
map(lambda x: x.split(':'), constant.split(',')) | |
) | |
) | |
add_value_to_script_namespace(py_name, constant) | |
# load the script | |
if self.script_choice.value: | |
dir_name = self.script_dir.get_absolute_path() | |
script_name = self.script_file.value | |
path = os.path.join(dir_name, script_name) | |
print 'script_name:', path | |
with open(path) as f: | |
#script = unicode(f.read()) | |
lines = f.readlines() | |
script = ''.join(lines) | |
else: | |
script_name = '<runscript script>' | |
script = self.script_text.value | |
# maybe add some debugging stuff | |
if self.wants_debug_mode.value == WD_PDB: | |
script = u"import pdb\npdb.set_trace()\n" + script | |
elif self.wants_debug_mode == WD_WINGDB: | |
script = u"import wingdbstub\nwingdbstub.debugger.Break()" + script | |
elif self.wants_debug_mode == WD_RPDB2: | |
script = u"import rpdb2\nrpdb2.set_trace()\n" + script | |
print 'script:', script | |
# compile the script | |
codeobj = compile(script, script_name, 'exec') | |
# run script | |
exec codeobj in global_ns, user_ns | |
# retrieve output images from the script namespace | |
for group in self.output_image_groups: | |
image = user_ns[group.py_name.value] | |
if not isinstance(image, cpi.Image): | |
image = cpi.Image(image) | |
workspace.image_set.add(group.image_name.value, image) | |
# retrieve output objects from the script namespace | |
for group in self.output_object_groups: | |
objects = user_ns[group.py_name.value] | |
if not isinstance(objects, cpo.Objects): | |
new_objects = cpo.Objects() | |
new_objects.segmented = objects | |
objects = new_objects | |
workspace.object_set.add_objects(objects, group.objects_name.value) | |
# retrieve output measurements from the script namespace | |
for group in self.output_measurement_groups: | |
use_object_name = (cpmeas.IMAGE if group.wants_image.value | |
else group.use_object_name.value) | |
py_name = group.py_name.value | |
measurement_name = "%s_%s_%s" % ( | |
group.measurement_category.value, | |
group.measurement_name.value, | |
group.image.value | |
) | |
measurement = user_ns[py_name] | |
if group.wants_image.value: | |
workspace.measurements.add_image_measurement( | |
measurement_name, | |
measurement | |
) | |
else: | |
workspace.measurements.add_measurement( | |
use_object_name, | |
measurement_name, | |
measurement | |
) | |
################################ | |
# | |
# DISPLAY | |
# | |
def is_interactive(self): | |
return False | |
def display(self, workspace): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment