Last active
December 11, 2015 10:38
-
-
Save lukecampbell/4588361 to your computer and use it in GitHub Desktop.
Load datasets and streamer utilities
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# IPython log file | |
from ion.services.dm.utility.granule_utils import time_series_domain | |
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient | |
from interface.services.dm.idataset_management_service import DatasetManagementServiceClient | |
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient | |
from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient | |
from interface.services.dm.idata_retriever_service import DataRetrieverServiceClient | |
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService | |
from interface.services.sa.idata_acquisition_management_service import DataAcquisitionManagementServiceClient | |
from ion.services.dm.utility.granule import RecordDictionaryTool | |
from pyon.container.cc import Container | |
from pyon.ion.stream import StandaloneStreamPublisher, StandaloneStreamSubscriber | |
from ion.services.dm.utility.test.parameter_helper import ParameterHelper | |
from interface.objects import DataProduct | |
from coverage_model import QuantityType, ArrayType, CategoryType, ConstantType, SparseConstantType | |
import numpy as np | |
import time | |
from pyon.public import PRED | |
import gevent | |
dset_i = 0 | |
stream_n=0 | |
def load_datasets(): | |
global dset_i | |
pubsub_management = PubsubManagementServiceClient() | |
dataset_management = DatasetManagementServiceClient() | |
ingestion_management = IngestionManagementServiceClient() | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition('std', parameter_dictionary_id=pdict_id) | |
stream_id, route = pubsub_management.create_stream('instrument_stream_%s'%dset_i, 'xp1', stream_definition_id=stream_def_id) | |
ingest_config_id = ingestion_management.list_ingestion_configurations(id_only=True)[0] | |
tdom, sdom = time_series_domain() | |
dataset_id = dataset_management.create_dataset('instrument_dataset_%s'%dset_i, parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump()) | |
ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id) | |
dataset_management.register_dataset(dataset_id) | |
dset_i +=1 | |
return stream_id, route, stream_def_id, dataset_id | |
def start_stats_listener(): | |
from pyon.event.event import EventSubscriber | |
def cb(*args, **kwargs): | |
print args[0].__dict__ | |
es = EventSubscriber(event_type='IngestionStatus', callback=cb) | |
return es, es.start() | |
def make_pubsub(data_product_id, callback): | |
global stream_n | |
pubsub_management = PubsubManagementServiceClient() | |
stream_def_id = get_stream_def(data_product_id) | |
stream_id, route = pubsub_management.create_stream('test%s' % stream_n, stream_definition_id=stream_def_id, exchange_point='science data') | |
publisher = StandaloneStreamPublisher(stream_id, route) | |
sub_id = pubsub_management.create_subscription('listener%s' % stream_n, stream_ids=[stream_id]) | |
pubsub_management.activate_subscription(sub_id) | |
subscriber = StandaloneStreamSubscriber('listener%s' % stream_n, callback=callback) | |
subscriber.start() | |
return publisher, subscriber | |
def get_stream_def(data_product_id): | |
resource_registry = Container.instance.resource_registry | |
stream_def_ids, _ = resource_registry.find_objects(data_product_id, 'hasStreamDefinition', id_only=True) | |
return stream_def_ids[0] | |
def populate_dataset(dataset_id, hours): | |
cov = DatasetManagementService._get_simplex_coverage(dataset_id) | |
dt = int(hours * 3600) | |
cov.insert_timesteps(dt) | |
now = time.time() | |
cov.set_parameter_values('time', np.arange(now - dt, now) + 2208988800) | |
cov.set_parameter_values('temp', [280000] * dt) | |
cov.set_parameter_values('conductivity', [100000] * dt) | |
cov.set_parameter_values('pressure', [2789] * dt) | |
cov.set_parameter_values('lat', [45] * dt) | |
cov.set_parameter_values('lon', [-17] * dt) | |
def load_qc_tables(): | |
resource_registry = Container.instance.resource_registry | |
data_acquisition = DataAcquisitionManagementServiceClient() | |
grt_path = 'res/preload/r2_ioc/attachments/Data_QC_Lookup_Table_Global_Range_Test_2013-2-21.csv' | |
grt_parser_ids, _ = resource_registry.find_resources(restype='Parser', name="Global Range Test",id_only=True) | |
grt_parser_id = grt_parser_ids[0] | |
data_acquisition.parse_qc_reference(grt_parser_id, open(grt_path).read()) | |
spk_path = 'res/preload/r2_ioc/attachments/Data_QC_Lookup_Table_spike_test_updated.csv' | |
spk_parser_ids, _ = resource_registry.find_resources(restype='Parser', name="Spike Test",id_only=True) | |
spk_parser_id = spk_parser_ids[0] | |
data_acquisition.parse_qc_reference(spk_parser_id, open(spk_path).read()) | |
def get_rdt(pdict_name='', config={}): | |
pubsub_management = PubsubManagementServiceClient() | |
dataset_management = DatasetManagementServiceClient() | |
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name or 'ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition('%s streamdef' % pdict_name or 'ctd', parameter_dictionary_id=pdict_id, stream_configuration=config) | |
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id) | |
return rdt | |
def rdt_for_data_product(data_product_id=''): | |
resource_registry = Container.instance.resource_registry | |
stream_def_ids, _ = resource_registry.find_objects(data_product_id,'hasStreamDefinition',id_only=True) | |
rdt = RecordDictionaryTool(stream_definition_id=stream_def_ids[0]) | |
return rdt | |
def publish_rdt_to_data_product(data_product_id, rdt, connection_id='', connection_index=''): | |
resource_registry = Container.instance.resource_registry | |
pubsub_management = PubsubManagementServiceClient() | |
stream_ids, _ = resource_registry.find_objects(data_product_id,'hasStream',id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
publisher = StandaloneStreamPublisher(stream_id,route) | |
publisher.publish(rdt.to_granule(connection_id=connection_id, connection_index=connection_index)) | |
def populate_data_product(data_product_id, hours=1): | |
resource_registry = Container.instance.resource_registry | |
dataset_ids, _ = resource_registry.find_objects(data_product_id, 'hasDataset',id_only=True) | |
dataset_id = dataset_ids[0] | |
cov = DatasetManagementService._get_simplex_coverage(dataset_id) | |
rdt = rdt_for_data_product(data_product_id) | |
rdt['time'] = [0] | |
rdt.fetch_lookup_values() | |
for field in rdt._lookup_values(): | |
print 'looking at ', field | |
if isinstance(rdt.context(field).param_type, SparseConstantType): | |
print '%s is sparse constant type' % field | |
if rdt[field]: | |
print 'Setting %s in cov' % field | |
cov.set_parameter_values(field, value=rdt[field]) | |
dt= int(hours * 3600) | |
cov.insert_timesteps(dt) | |
now = time.time() | |
tparam = cov.temporal_parameter_name | |
for param in cov.list_parameters(): | |
if param == tparam: | |
cov.set_parameter_values('time', np.arange(now - dt, now) + 2208988800) | |
else: | |
fill_parameter(cov, param, dt) | |
def fill_tmpsf(cov): | |
cov.insert_timesteps(3600) | |
now = time.time() | |
ntp_now = now - 2208988800 | |
cov.set_parameter_values('time', np.arange(ntp_now-3600,ntp_now)) | |
temp_vals = np.array([[0,1,2,3,4]]*3600) | |
cov.set_parameter_values('temperature', temp_vals) | |
def fill_parameter(cov, parameter, dt): | |
context = cov.get_parameter_context(parameter) | |
t = np.arange(dt) | |
if isinstance(context.param_type, QuantityType): | |
if parameter == 'temp': | |
cov.set_parameter_values(parameter, float_range(-2,50,t)) | |
elif parameter == 'conductivity': | |
cov.set_parameter_values(parameter, float_range(0, 200,t)) | |
elif parameter == 'pressure': | |
cov.set_parameter_values(parameter, float_range(0,5000, t)) | |
elif parameter == 'lat': | |
cov.set_parameter_values(parameter, [45] * dt) | |
elif parameter == 'lon': | |
cov.set_parameter_values(parameter, [-71] * dt) | |
else: | |
cov.set_parameter_values(parameter, np.sin(np.pi * 2 * t / 60)) | |
elif isinstance(context.param_type, ArrayType): | |
values = np.array([range(10)]*dt) | |
cov.set_parameter_values(parameter, values) | |
elif isinstance(context.param_type, CategoryType): | |
cov.set_parameter_values(parameter, [context.categories.keys()[0]] * t) | |
elif isinstance(context.param_type, ConstantType): | |
cov.set_parameter_values(parameter, np.dtype(context.param_type.value_encoding).type(1)) | |
def float_range(minvar, maxvar,t): | |
a = (maxvar-minvar)/2 | |
return np.sin(np.pi * 2 * t /60) * a + (minvar + a) | |
def make_data_product(pdict_name, dp_name, available_fields=[]): | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name, id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition('%s stream_def' % dp_name, parameter_dictionary_id=pdict_id, available_fields=available_fields or None) | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct(name=dp_name) | |
dp_obj.temporal_domain = tdom | |
dp_obj.spatial_domain = sdom | |
data_product_id = data_product_management.create_data_product(dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
return data_product_id | |
def create_extended_data_product(): | |
dataset_management = DatasetManagementServiceClient() | |
ph = ParameterHelper(dataset_management, lambda *args, **kwargs : None) | |
ph.create_extended_parsed() | |
dp_id = make_data_product('extended_parsed', 'extended data product') | |
return dp_id | |
def create_data_product(pdict_name, reference_designator=''): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='%s data product' % pdict_name, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name, id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='%s def' % pdict_name, parameter_dictionary_id=pdict_id, stream_configuration={'reference_designator':reference_designator}) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
def load_data_product_prime(): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='instrument_data_product_%i' % dset_i, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_LC_TEST', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
def load_data_product(): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='instrument_data_product_%i' % dset_i, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
class Streamer(object): | |
def __init__(self, data_product_id, interval=1): | |
self.resource_registry = Container.instance.resource_registry | |
self.pubsub_management = PubsubManagementServiceClient() | |
self.data_product_id = data_product_id | |
self.i=0 | |
self.interval = interval | |
self.finished = False | |
self.g = gevent.spawn(self.run) | |
def run(self): | |
while not self.finished: | |
gevent.sleep(self.interval) | |
rdt = rdt_for_data_product(self.data_product_id) | |
now = time.time() | |
rdt['time'] = np.array([now + 2208988800]) | |
rdt['temp'] = self.float_range(10,14,np.array([now])) | |
rdt['pressure'] = self.float_range(11,12,np.array([now])) | |
rdt['lat'] = [41.205] | |
rdt['lon'] = [-71.74] | |
rdt['conductivity'] = self.float_range(3.3,3.5,np.array([now])) | |
rdt['driver_timestamp'] = np.array([now + 2208988800]) | |
rdt['preferred_timestamp'] = ['driver_timestamp'] | |
publish_rdt_to_data_product(self.data_product_id, rdt) | |
self.i += 1 | |
def close(self): | |
self.finished = True | |
self.g.join(5) | |
self.g.kill() | |
def start(self): | |
self.finished = False | |
self.g = gevent.spawn(self.run) | |
@classmethod | |
def float_range(cls, minvar, maxvar,t): | |
''' | |
Produces a signal with values between minvar and maxvar | |
at a frequency of 1/60 Hz centered at the midpoint | |
between minvar and maxvar. | |
This method provides a deterministic function that | |
varies over time and is sinusoidal when graphed. | |
''' | |
a = (maxvar-minvar)/2 | |
return np.sin(np.pi * 2 * t /60) * a + (minvar + a) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyon.ion.stream import StandaloneStreamPublisher | |
from ion.services.dm.utility.granule import RecordDictionaryTool | |
import gevent | |
import numpy as np | |
class Streamer(object): | |
def __init__(self, stream_id, route, stream_def_id, interval=1): | |
self.publisher = StandaloneStreamPublisher(stream_id, route) | |
self.i=0 | |
self.interval = interval | |
self.stream_def_id = stream_def_id | |
self.g = gevent.spawn(self.run) | |
self.finished = False | |
def run(self): | |
while not self.finished: | |
gevent.sleep(self.interval) | |
rdt = RecordDictionaryTool(stream_definition_id=self.stream_def_id) | |
rdt['time'] = np.array([self.i]) | |
rdt['temp'] = np.array([self.i]) | |
self.i += 1 | |
self.publisher.publish(rdt.to_granule()) | |
def close(self): | |
self.finished = True | |
self.g.join(5) | |
self.g.kill() | |
def quick_write(stream_id, route, stream_def_id, values=1000): | |
publisher = StandaloneStreamPublisher(stream_id, route) | |
stream_def_id = stream_def_id | |
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id) | |
rdt['time'] = np.arange(values) | |
rdt['temp'] = np.arange(values) | |
publisher.publish(rdt.to_granule()) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from load_datasets import load_data_product | |
from streamer import quick_write | |
dp_id, stream_id, route, stream_def_id, dataset_id = load_data_product() | |
quick_write(stream_id, route, stream_def_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment