Last active
December 20, 2015 11:29
-
-
Save wasade/6123807 to your computer and use it in GitHub Desktop.
Prototype delayed IO generalized containers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = "Daniel McDonald" | |
__copyright__ = "Copyright 2013, The QIIME Project" | |
__credits__ = ["Daniel McDonald", "Greg Caporaso", "Doug Wendel", | |
"Jai Ram Rideout"] | |
__license__ = "GPL" | |
__version__ = "0.1.0-dev" | |
__maintainer__ = "Daniel McDonald" | |
__email__ = "[email protected]" | |
from biom.parse import parse_biom_table | |
class ContainerError(Exception): | |
pass | |
class CannotReadError(ContainerError): | |
pass | |
class CannotWriteError(ContainerError): | |
pass | |
class Container(object): | |
_reserved = set(['_reserved','TypeName', 'Info']) | |
TypeName = "Container" | |
Info = None | |
def __init__(self, *args, **kwargs): | |
if 'Info' in kwargs: | |
super(Container, self).__setattr__('Info', kwargs['Info']) | |
def _load_if_needed(self): | |
pass | |
def __getattr__(self, attr): | |
if attr in super(Container, self).__getattribute__('_reserved'): | |
return super(Container, self).__getattribute__(attr) | |
self._load_if_needed() | |
return getattr(self._object, attr) | |
def __setattr__(self, attr, val): | |
if attr in super(Container, self).__getattribute__('_reserved'): | |
return super(Container, self).__setattr__(attr, val) | |
self._load_if_needed() | |
setattr(self._object, attr, val) | |
def __hasattr__(self, attr): | |
if attr in super(Container, self).__getattribute__('_reserved'): | |
return True | |
self._load_if_needed() | |
return hasattr(self._object, attr) | |
class IOContainer(Container): | |
_reserved = set(['_reserved', 'TypeName', '_reader', '_writer', | |
'_object', 'InPath', 'OutPath','read', 'write', | |
'_load_if_needed', 'Info']) | |
TypeName = "IOContainer" | |
_object = None | |
InPath = None | |
OutPath = None | |
Info = None | |
def __init__(self, *args, **kwargs): | |
if 'Object' in kwargs: | |
super(IOContainer, self).__setattr__('_object', kwargs['Object']) | |
if 'InPath' in kwargs: | |
super(IOContainer, self).__setattr__('InPath', kwargs['InPath']) | |
if 'OutPath' in kwargs: | |
super(IOContainer, self).__setattr__('OutPath', kwargs['OutPath']) | |
super(IOContainer, self).__init__(*args, **kwargs) | |
def _load_if_needed(self): | |
if self._object is None: | |
if self.InPath is not None: | |
self._object = self._reader(self.InPath) | |
else: | |
raise CannotReadError("No object and InPath is None!") | |
def write(self): | |
if self._object is not None: | |
if self.OutPath is None: | |
raise CannotWriteError("OutPath is None!") | |
self._writer(self.OutPath) | |
def read(self): | |
if self._object is None: | |
if self.InPath is None: | |
raise CannotReadError("InPath is None!") | |
self._object = self._reader(self.InPath) | |
class QIIMEInt(Container, int): | |
TypeName = "Int" | |
class QIIMEFloat(Container, float): | |
TypeName = "Float" | |
class QIIMEStr(Container, str): | |
TypeName = "String" | |
def write_biom_table(path, s): | |
f = open(path, 'w') | |
f.write(s) | |
f.close() | |
class BIOMTable(IOContainer): | |
TypeName = "BIOMTable" | |
_reader = lambda self, x: parse_biom_table(open(x)) | |
_writer = lambda self, x: write_biom_table(x, | |
self.getBiomFormatJsonString('asd')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment