-
-
Save blazetopher/5542643 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# IPython log file | |
from ion.services.dm.utility.granule_utils import time_series_domain | |
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient | |
from interface.services.dm.idataset_management_service import DatasetManagementServiceClient | |
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient | |
from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient | |
from interface.services.dm.idata_retriever_service import DataRetrieverServiceClient | |
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService | |
from ion.services.dm.utility.granule import RecordDictionaryTool | |
from pyon.container.cc import Container | |
from pyon.ion.stream import StandaloneStreamPublisher, StandaloneStreamSubscriber | |
from ion.services.dm.utility.test.parameter_helper import ParameterHelper | |
from interface.objects import DataProduct | |
from coverage_model import QuantityType, ArrayType, CategoryType, ConstantType | |
import numpy as np | |
import time | |
dset_i = 0 | |
stream_n=0 | |
def load_datasets(): | |
global dset_i | |
pubsub_management = PubsubManagementServiceClient() | |
dataset_management = DatasetManagementServiceClient() | |
ingestion_management = IngestionManagementServiceClient() | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition('std', parameter_dictionary_id=pdict_id) | |
stream_id, route = pubsub_management.create_stream('instrument_stream_%s'%dset_i, 'xp1', stream_definition_id=stream_def_id) | |
ingest_config_id = ingestion_management.list_ingestion_configurations(id_only=True)[0] | |
tdom, sdom = time_series_domain() | |
dataset_id = dataset_management.create_dataset('instrument_dataset_%s'%dset_i, parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump()) | |
ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id) | |
dataset_management.register_dataset(dataset_id) | |
dset_i +=1 | |
return stream_id, route, stream_def_id, dataset_id | |
def start_stats_listener(): | |
from pyon.event.event import EventSubscriber | |
def cb(*args, **kwargs): | |
print args[0].__dict__ | |
es = EventSubscriber(event_type='IngestionStatus', callback=cb) | |
return es, es.start() | |
def make_pubsub(data_product_id, callback): | |
global stream_n | |
pubsub_management = PubsubManagementServiceClient() | |
stream_def_id = get_stream_def(data_product_id) | |
stream_id, route = pubsub_management.create_stream('test%s' % stream_n, stream_definition_id=stream_def_id, exchange_point='science data') | |
publisher = StandaloneStreamPublisher(stream_id, route) | |
sub_id = pubsub_management.create_subscription('listener%s' % stream_n, stream_ids=[stream_id]) | |
pubsub_management.activate_subscription(sub_id) | |
subscriber = StandaloneStreamSubscriber('listener%s' % stream_n, callback=callback) | |
subscriber.start() | |
return publisher, subscriber | |
def get_stream_def(data_product_id): | |
resource_registry = Container.instance.resource_registry | |
stream_def_ids, _ = resource_registry.find_objects(data_product_id, 'hasStreamDefinition', id_only=True) | |
return stream_def_ids[0] | |
def populate_dataset(dataset_id, hours): | |
cov = DatasetManagementService._get_simplex_coverage(dataset_id) | |
dt = hours * 3600 | |
cov.insert_timesteps(dt) | |
now = time.time() | |
cov.set_parameter_values('time', np.arange(now - dt, now) + 2208988800) | |
cov.set_parameter_values('temp', [280000] * dt) | |
cov.set_parameter_values('conductivity', [100000] * dt) | |
cov.set_parameter_values('pressure', [2789] * dt) | |
cov.set_parameter_values('lat', [45] * dt) | |
cov.set_parameter_values('lon', [-17] * dt) | |
def get_rdt(pdict_name=''): | |
pubsub_management = PubsubManagementServiceClient() | |
dataset_management = DatasetManagementServiceClient() | |
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name or 'ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition('%s streamdef' % pdict_name or 'ctd', parameter_dictionary_id=pdict_id) | |
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id) | |
return rdt | |
def rdt_for_data_product(data_product_id=''): | |
resource_registry = Container.instance.resource_registry | |
stream_def_ids, _ = resource_registry.find_objects(data_product_id,'hasStreamDefinition',id_only=True) | |
rdt = RecordDictionaryTool(stream_definition_id=stream_def_ids[0]) | |
return rdt | |
def publish_rdt_to_data_product(data_product_id, rdt, connection_id='', connection_index=''): | |
resource_registry = Container.instance.resource_registry | |
pubsub_management = PubsubManagementServiceClient() | |
stream_ids, _ = resource_registry.find_objects(data_product_id,'hasStream',id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
publisher = StandaloneStreamPublisher(stream_id,route) | |
publisher.publish(rdt.to_granule(connection_id=connection_id, connection_index=connection_index)) | |
def populate_data_product(data_product_id, hours=1): | |
resource_registry = Container.instance.resource_registry | |
dataset_ids, _ = resource_registry.find_objects(data_product_id, 'hasDataset',id_only=True) | |
dataset_id = dataset_ids[0] | |
cov = DatasetManagementService._get_simplex_coverage(dataset_id) | |
dt= hours * 3600 | |
cov.insert_timesteps(dt) | |
now = time.time() | |
tparam = cov.temporal_parameter_name | |
for param in cov.list_parameters(): | |
if param == tparam: | |
cov.set_parameter_values('time', np.arange(now - dt, now) + 2208988800) | |
else: | |
fill_parameter(cov, param, dt) | |
def fill_parameter(cov, parameter, dt): | |
context = cov.get_parameter_context(parameter) | |
t = np.arange(dt) | |
if isinstance(context.param_type, QuantityType): | |
if parameter == 'temp': | |
cov.set_parameter_values(parameter, float_range(1e5,4e5,t)) | |
elif parameter == 'conductivity': | |
cov.set_parameter_values(parameter, float_range(510000, 550000,t)) | |
elif parameter == 'pressure': | |
cov.set_parameter_values(parameter, float_range(50,30380, t)) | |
elif parameter == 'lat': | |
cov.set_parameter_values(parameter, [45] * t) | |
elif parameter == 'lon': | |
cov.set_parameter_values(parameter, [-71] * t) | |
else: | |
cov.set_parameter_values(parameter, np.sin(np.pi * 2 * t / 60)) | |
elif isinstance(context.param_type, ArrayType): | |
values = np.array([range(10)]*dt) | |
cov.set_parameter_values(parameter, values) | |
elif isinstance(context.param_type, CategoryType): | |
cov.set_parameter_values(parameter, [context.categories.keys()[0]] * t) | |
elif isinstance(context.param_type, ConstantType): | |
cov.set_parameter_values(parameter, np.dtype(context.param_type.value_encoding).type(1)) | |
def float_range(minvar, maxvar,t): | |
a = (maxvar-minvar)/2 | |
return np.sin(np.pi * 2 * t /60) * a + (minvar + a) | |
def make_data_product(pdict_name, dp_name, available_fields=[]): | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name, id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition('%s stream_def' % dp_name, parameter_dictionary_id=pdict_id, available_fields=available_fields or None) | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct(name=dp_name) | |
dp_obj.temporal_domain = tdom | |
dp_obj.spatial_domain = sdom | |
data_product_id = data_product_management.create_data_product(dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
return data_product_id | |
def create_extended_data_product(): | |
dataset_management = DatasetManagementServiceClient() | |
ph = ParameterHelper(dataset_management, lambda *args, **kwargs : None) | |
ph.create_extended_parsed() | |
dp_id = make_data_product('extended_parsed', 'extended data product') | |
return dp_id | |
def create_data_product(pdict_name): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='%s data product' % pdict_name, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name, id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='%s def' % pdict_name, parameter_dictionary_id=pdict_id) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
def load_data_product_prime(): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='instrument_data_product_%i' % dset_i, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_LC_TEST', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
def load_data_product(): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='instrument_data_product_%i' % dset_i, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyon.ion.stream import StandaloneStreamPublisher | |
from ion.services.dm.utility.granule import RecordDictionaryTool | |
import gevent | |
import numpy as np | |
class Streamer(object): | |
def __init__(self, stream_id, route, stream_def_id, interval=1): | |
self.publisher = StandaloneStreamPublisher(stream_id, route) | |
self.i=0 | |
self.interval = interval | |
self.stream_def_id = stream_def_id | |
self.g = gevent.spawn(self.run) | |
self.finished = False | |
def run(self): | |
while not self.finished: | |
gevent.sleep(self.interval) | |
rdt = RecordDictionaryTool(stream_definition_id=self.stream_def_id) | |
rdt['time'] = np.array([self.i]) | |
rdt['temp'] = np.array([self.i]) | |
self.i += 1 | |
self.publisher.publish(rdt.to_granule()) | |
def close(self): | |
self.finished = True | |
self.g.join(5) | |
self.g.kill() | |
def quick_write(stream_id, route, stream_def_id, values=1000): | |
publisher = StandaloneStreamPublisher(stream_id, route) | |
stream_def_id = stream_def_id | |
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id) | |
rdt['time'] = np.arange(values) | |
rdt['temp'] = np.arange(values) | |
publisher.publish(rdt.to_granule()) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from load_datasets import load_data_product | |
from streamer import quick_write | |
dp_id, stream_id, route, stream_def_id, dataset_id = load_data_product() | |
quick_write(stream_id, route, stream_def_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment