Created
February 11, 2013 18:08
-
-
Save blazetopher/4756260 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
@package | |
@file data_proc_poc | |
@author Christopher Mueller | |
@brief | |
""" | |
# IPython log file | |
from ion.services.dm.utility.granule_utils import time_series_domain | |
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient | |
from interface.services.dm.idataset_management_service import DatasetManagementServiceClient | |
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient | |
from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient | |
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService | |
from pyon.container.cc import Container | |
from interface.objects import DataProduct | |
import numpy as np | |
from coverage_model import * | |
import time | |
dset_i = 0 | |
def load_datasets(): | |
global dset_i | |
pubsub_management = PubsubManagementServiceClient() | |
dataset_management = DatasetManagementServiceClient() | |
ingestion_management = IngestionManagementServiceClient() | |
param_contexts = create_params() | |
context_ids = [dataset_management.create_parameter_context(x.name, x.dump()) for x in param_contexts] | |
pdict_id = dataset_management.create_parameter_dictionary('poc_dict', context_ids, temporal_context='poc_TIME') | |
stream_def_id = pubsub_management.create_stream_definition('std', parameter_dictionary_id=pdict_id) | |
stream_id, route = pubsub_management.create_stream('instrument_stream_%s'%dset_i, 'xp1', stream_definition_id=stream_def_id) | |
ingest_config_id = ingestion_management.list_ingestion_configurations(id_only=True)[0] | |
tdom, sdom = time_series_domain() | |
dataset_id = dataset_management.create_dataset('instrument_dataset_%s'%dset_i, parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump()) | |
ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id) | |
dataset_management.register_dataset(dataset_id) | |
dset_i +=1 | |
return stream_id, route, stream_def_id, dataset_id | |
def start_stats_listener(): | |
from pyon.event.event import EventSubscriber | |
def cb(*args, **kwargs): | |
print args[0].__dict__ | |
es = EventSubscriber(event_type='IngestionStatus', callback=cb) | |
return es, es.start() | |
def populate_dataset(dataset_id, hours): | |
cov = DatasetManagementService._get_coverage(dataset_id) | |
dt = hours * 3600 | |
cov.insert_timesteps(dt) | |
now = time.time() | |
cov.set_parameter_values('poc_TIME', np.arange(now - dt, now) + 2208988800) | |
cov.set_parameter_values('poc_LAT', 41.5) | |
cov.set_parameter_values('poc_LON', -71.5) | |
# SBE 37IM - temperature in 't_dec' example range between 280000 and 350000 | |
cov.set_parameter_values('poc_TEMPWAT_L0', value=np.random.random_sample(dt)*(350000-280000)+280000) | |
# SBE 37IM - conductivity, ranging between 100000 & 750000 (not 0 because never 0 in seawater) | |
cov.set_parameter_values('poc_CONDWAT_L0', value=np.random.random_sample(dt)*(750000-100000)+100000) | |
# SBE 37IM - pressure, ranging between 2789 and 10000 (couldn't find a range in the DPS, this seems reasonable!) | |
cov.set_parameter_values('poc_PRESWAT_L0', value=np.random.random_sample(dt)*(10000-2789)+2789) | |
def load_data_product(): | |
global dset_i | |
dataset_management = DatasetManagementServiceClient() | |
pubsub_management = PubsubManagementServiceClient() | |
data_product_management = DataProductManagementServiceClient() | |
resource_registry = Container.instance.resource_registry | |
tdom, sdom = time_series_domain() | |
tdom = tdom.dump() | |
sdom = sdom.dump() | |
dp_obj = DataProduct( | |
name='instrument_data_product_%i' % dset_i, | |
description='ctd stream test', | |
processing_level_code='Parsed_Canonical', | |
temporal_domain = tdom, | |
spatial_domain = sdom) | |
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True) | |
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id) | |
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id) | |
data_product_management.activate_data_product_persistence(data_product_id) | |
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True) | |
stream_id = stream_ids[0] | |
route = pubsub_management.read_stream_route(stream_id) | |
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True) | |
dataset_id = dataset_ids[0] | |
dset_i+=1 | |
return data_product_id, stream_id, route, stream_def_id, dataset_id | |
def create_params(): | |
contexts = [] | |
t_ctxt = ParameterContext('poc_TIME', param_type=QuantityType(value_encoding=np.dtype('int64'))) | |
t_ctxt.uom = 'seconds since 01-01-1900' | |
contexts.append(t_ctxt) | |
lat_ctxt = ParameterContext('poc_LAT', param_type=ConstantType(QuantityType(value_encoding=np.dtype('float32'))), fill_value=-9999) | |
lat_ctxt.axis = AxisTypeEnum.LAT | |
lat_ctxt.uom = 'degree_north' | |
contexts.append(lat_ctxt) | |
lon_ctxt = ParameterContext('poc_LON', param_type=ConstantType(QuantityType(value_encoding=np.dtype('float32'))), fill_value=-9999) | |
lon_ctxt.axis = AxisTypeEnum.LON | |
lon_ctxt.uom = 'degree_east' | |
contexts.append(lon_ctxt) | |
# Independent Parameters | |
# Temperature - values expected to be the decimal results of conversion from hex | |
temp_ctxt = ParameterContext('poc_TEMPWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999) | |
temp_ctxt.uom = 'deg_C' | |
contexts.append(temp_ctxt) | |
# Conductivity - values expected to be the decimal results of conversion from hex | |
cond_ctxt = ParameterContext('poc_CONDWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999) | |
cond_ctxt.uom = 'S m-1' | |
contexts.append(cond_ctxt) | |
# Pressure - values expected to be the decimal results of conversion from hex | |
press_ctxt = ParameterContext('poc_PRESWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999) | |
press_ctxt.uom = 'dbar' | |
contexts.append(press_ctxt) | |
# Dependent Parameters | |
# TEMPWAT_L1 = (TEMPWAT_L0 / 10000) - 10 | |
tl1_func = '(T_L0 / 10000) - 10' | |
tl1_pmap = {'T_L0':'poc_TEMPWAT_L0'} | |
expr = NumexprExpression(tl1_func, tl1_pmap) | |
tempL1_ctxt = ParameterContext('poc_TEMPWAT_L1', param_type=ParameterFunctionType(expression=expr), variability=VariabilityEnum.TEMPORAL) | |
tempL1_ctxt.uom = 'deg_C' | |
contexts.append(tempL1_ctxt) | |
# CONDWAT_L1 = (CONDWAT_L0 / 100000) - 0.5 | |
cl1_func = '(C_L0 / 100000) - 0.5' | |
cl1_pmap = {'C_L0':'poc_CONDWAT_L0'} | |
expr = NumexprExpression(cl1_func, cl1_pmap) | |
condL1_ctxt = ParameterContext('poc_CONDWAT_L1', param_type=ParameterFunctionType(expression=expr), variability=VariabilityEnum.TEMPORAL) | |
condL1_ctxt.uom = 'S m-1' | |
contexts.append(condL1_ctxt) | |
# Equation uses p_range, which is a calibration coefficient - Fixing to 679.34040721 | |
# PRESWAT_L1 = (PRESWAT_L0 * p_range / (0.85 * 65536)) - (0.05 * p_range) | |
pl1_func = '(P_L0 * 679.34040721 / (0.85 * 65536)) - (0.05 * 679.34040721)' | |
pl1_pmap = {'P_L0':'poc_PRESWAT_L0'} | |
expr = NumexprExpression(pl1_func, pl1_pmap) | |
presL1_ctxt = ParameterContext('poc_PRESWAT_L1', param_type=ParameterFunctionType(expression=expr), variability=VariabilityEnum.TEMPORAL) | |
presL1_ctxt.uom = 'S m-1' | |
contexts.append(presL1_ctxt) | |
# Density & practical salinity calucluated using the Gibbs Seawater library - available via python-gsw project: | |
# https://code.google.com/p/python-gsw/ & http://pypi.python.org/pypi/gsw/3.0.1 | |
# PRACSAL = gsw.SP_from_C((CONDWAT_L1 * 10), TEMPWAT_L1, PRESWAT_L1) | |
owner = 'gsw' | |
sal_func = 'SP_from_C' | |
sal_arglist = [NumexprExpression('C*10', {'C':'poc_CONDWAT_L1'}), 'poc_TEMPWAT_L1', 'poc_PRESWAT_L1'] | |
sal_kwargmap = None | |
expr = PythonExpression(owner, sal_func, sal_arglist, sal_kwargmap) | |
sal_ctxt = ParameterContext('poc_PRACSAL', param_type=ParameterFunctionType(expr), variability=VariabilityEnum.TEMPORAL) | |
sal_ctxt.uom = 'g kg-1' | |
contexts.append(sal_ctxt) | |
# absolute_salinity = gsw.SA_from_SP(PRACSAL, PRESWAT_L1, longitude, latitude) | |
# conservative_temperature = gsw.CT_from_t(absolute_salinity, TEMPWAT_L1, PRESWAT_L1) | |
# DENSITY = gsw.rho(absolute_salinity, conservative_temperature, PRESWAT_L1) | |
owner = 'gsw' | |
abs_sal_expr = PythonExpression(owner, 'SA_from_SP', ['poc_PRACSAL', 'poc_PRESWAT_L1', 'poc_LON','poc_LAT'], None) | |
cons_temp_expr = PythonExpression(owner, 'CT_from_t', [abs_sal_expr, 'poc_TEMPWAT_L1', 'poc_PRESWAT_L1'], None) | |
dens_expr = PythonExpression(owner, 'rho', [abs_sal_expr, cons_temp_expr, 'poc_PRESWAT_L1'], None) | |
dens_ctxt = ParameterContext('poc_DENSITY', param_type=ParameterFunctionType(dens_expr), variability=VariabilityEnum.TEMPORAL) | |
dens_ctxt.uom = 'kg m-3' | |
contexts.append(dens_ctxt) | |
return contexts | |
from pyon.ion.stream import StandaloneStreamPublisher | |
from ion.services.dm.utility.granule import RecordDictionaryTool | |
import gevent | |
class Streamer(object): | |
def __init__(self, stream_id, route, stream_def_id, interval=1): | |
self.publisher = StandaloneStreamPublisher(stream_id, route) | |
self.i=0 | |
self.interval = interval | |
self.stream_def_id = stream_def_id | |
self.g = gevent.spawn(self.run) | |
self.finished = False | |
def run(self): | |
while not self.finished: | |
gevent.sleep(self.interval) | |
rdt = RecordDictionaryTool(stream_definition_id=self.stream_def_id) | |
rdt['poc_TIME'] = time.time() + 2208988800 | |
rdt['poc_LAT'] = 41.5 | |
rdt['poc_LON'] = -71.5 | |
rdt['poc_TEMPWAT_L0'] = np.random.random_sample(1)*(350000-280000)+280000 | |
rdt['poc_CONDWAT_L0'] = np.random.random_sample(1)*(750000-100000)+100000 | |
rdt['poc_PRESWAT_L0'] = np.random.random_sample(1)*(10000-2789)+2789 | |
self.i += 1 | |
self.publisher.publish(rdt.to_granule()) | |
def close(self): | |
self.finished = True | |
self.g.join(5) | |
self.g.kill() | |
def quick_write(stream_id, route, stream_def_id, values=1000): | |
raise NotImplementedError("No!!") | |
# publisher = StandaloneStreamPublisher(stream_id, route) | |
# stream_def_id = stream_def_id | |
# rdt = RecordDictionaryTool(stream_definition_id=stream_def_id) | |
# rdt['time'] = np.arange(values) | |
# rdt['temp'] = np.arange(values) | |
# | |
# publisher.publish(rdt.to_granule()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment