Created
January 20, 2012 23:24
-
-
Save swarbhanu/1650184 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = 'Maurice Manning' | |
__license__ = 'Apache 2.0' | |
from interface.services.dm.iingestion_management_service import BaseIngestionManagementService | |
from pyon.core.exception import NotFound | |
from pyon.public import RT, AT, log, IonObject | |
from pyon.public import CFG | |
from ion.services.dm.ingestion.ingestion import Ingestion | |
from pyon.net.channel import SubscriberChannel | |
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient | |
from interface.services.dm.itransform_management_service import TransformManagementServiceClient | |
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient | |
class IngestionManagementService(BaseIngestionManagementService): | |
""" | |
class docstring | |
""" | |
XP = 'science.data' #CFG.exchange_spaces.ioncore.exchange_points.science_data.name | |
def __init__(self): | |
BaseIngestionManagementService.__init__(self) | |
def create_ingestion_configuration(self, exchange_point_id='', couch_storage={}, hfd_storage={}, \ | |
number_of_workers=0, default_policy={}): | |
"""Setup ingestion workers to ingest all the data from a single exchange point. | |
@param exchange_point_id str | |
@param couch_storage Unknown | |
@param hfd_storage Unknown | |
@param number_of_workers int | |
@param default_policy Unknown | |
@retval ingestion_configuration_id str | |
""" | |
# create an ingestion_configuration instance and update the registry | |
ingestion_configuration = IonObject(RT.IngestionConfiguration, name = exchange_point_id) | |
ingestion_configuration.number_of_workers = number_of_workers | |
ingestion_configuration.hfd_storage = hfd_storage | |
ingestion_configuration.couch_storage = couch_storage | |
ingestion_configuration.default_policy = default_policy | |
id, rev = self.clients.resource_registry.create(ingestion_configuration) | |
# update the id attribute of ingestion_configuration | |
ingestion_configuration._id = id | |
ingestion_configuration._rev = rev | |
return ingestion_configuration._id | |
def launch_transforms(self, ingestion_configuration_id, subscription_id, listen_name, output_stream_id): | |
# set up process definition | |
tms_client = TransformManagementServiceClient(node=cc.node) | |
rr_client = ResourceRegistryServiceClient(node=cc.node) | |
process_definition = IonObject(RT.ProcessDefinition, name='first_transform_definition') | |
process_definition.executable = {'module': 'ion.services.dm.transformation.example.transform_example', 'class':'TransformExample'} | |
process_definition_id, _ = rr_client.create(process_definition) | |
configuration= {'process':{'name':'configuration','type':"stream_process",'listen_name': listen_name }} | |
ingestion_configuration = self.read_ingestion_configuration(ingestion_configuration_id) | |
num = ingestion_configuration.number_of_workers | |
# launch the transforms | |
for i in range(1,num): | |
transform_id = tms_client.create_transform(in_subscription_id=subscription_id, out_stream_id=output_stream_id, | |
process_definition_id=process_definition_id, configuration=configuration) | |
tms_client.activate_transform(transform_id) | |
def update_ingestion_configuration(self, ingestion_configuration={}): | |
"""Change the number of workers or the default policy for ingesting data on each stream | |
@param ingestion_configuration IngestionConfiguration | |
""" | |
log.debug("Updating ingestion configuration") | |
id, rev = self.clients.resource_registry.update(ingestion_configuration) | |
def read_ingestion_configuration(self, ingestion_configuration_id=''): | |
"""Get an existing ingestion configuration object. | |
@param ingestion_configuration_id str | |
@retval ingestion_configuration IngestionConfiguration | |
@throws NotFound if ingestion configuration did not exist | |
""" | |
log.debug("Reading ingestion configuration object id: %s", ingestion_configuration_id) | |
ingestion_configuration = self.clients.resource_registry.read(ingestion_configuration_id) | |
if ingestion_configuration is None: | |
raise NotFound("Ingestion configuration %s does not exist" % ingestion_configuration_id) | |
return ingestion_configuration | |
def delete_ingestion_configuration(self, ingestion_configuration_id=''): | |
"""Delete an existing ingestion configuration object. | |
@param ingestion_configuration_id str | |
@throws NotFound if ingestion configuration did not exist | |
""" | |
log.debug("Deleting ingestion configuration: %s", ingestion_configuration_id) | |
ingestion_configuration = self.read_ingestion_configuration(ingestion_configuration_id) | |
if ingestion_configuration is None: | |
raise NotFound("Ingestion configuration %d does not exist" % ingestion_configuration_id) | |
self.clients.resource_registry.delete(ingestion_configuration) | |
def activate_ingestion_configuration(self, ingestion_configuration_id=''): | |
"""Activate an ingestion configuration and the transform processes that execute it | |
@param ingestion_configuration_id str | |
@throws NotFound The ingestion configuration id did not exist | |
""" | |
log.debug("Activating ingestion configuration") | |
transform_ids, _ = self.clients.resource_registry.find_objects(ingestion_configuration_id, | |
AT.hasTransform, RT.Transform, True) | |
if len(transform_ids) < 1: | |
raise NotFound | |
self.clients.transform_management.activate_transform(transform_ids[0]) | |
return True | |
def deactivate_ingestion_configuration(self, ingestion_configuration_id=''): | |
"""Deactivate an ingestion configuration and the transform processeses that execute it | |
@param ingestion_configuration_id str | |
@throws NotFound The ingestion configuration id did not exist | |
""" | |
log.debug("Deactivating ingestion configuration") | |
# use the deactivate method in transformation management service | |
pass | |
def create_stream_policy(self, stream_id='', archive_data='', archive_metadata=''): | |
"""Create a policy for a particular stream and associate it to the ingestion configuration for the exchange point the stream is on. (After LCA) | |
@param stream_id str | |
@param archive_data str | |
@param archive_metadata str | |
@retval ingestion_policy_id str | |
""" | |
# return ingestion_policy_id | |
def update_stream_policy(self, stream_policy={}): | |
"""Change the number of workers or the default policy for ingesting data on each stream (After LCA) | |
@param stream_policy Unknown | |
@throws NotFound if ingestion configuration did not exist | |
""" | |
def read_stream_policy(self, stream_policy_id=''): | |
"""Get an existing stream policy object. (After LCA) | |
@param stream_policy_id str | |
@retval ingestion_configuration IngestionConfiguration | |
@throws NotFound if ingestion configuration did not exist | |
""" | |
# return ingestion_configuration | |
def delete_stream_policy(self, ingestion_configuration_id=''): | |
"""Delete an existing stream policy object. (After LCA) | |
@param ingestion_configuration_id str | |
@throws NotFound if ingestion configuration did not exist | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment