Skip to content

Instantly share code, notes, and snippets.

@blazetopher
Created February 11, 2013 18:08
Show Gist options
  • Save blazetopher/4756260 to your computer and use it in GitHub Desktop.
Save blazetopher/4756260 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
@package
@file data_proc_poc
@author Christopher Mueller
@brief
"""
# IPython log file
from ion.services.dm.utility.granule_utils import time_series_domain
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient
from interface.services.dm.idataset_management_service import DatasetManagementServiceClient
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient
from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService
from pyon.container.cc import Container
from interface.objects import DataProduct
import numpy as np
from coverage_model import *
import time
dset_i = 0
def load_datasets():
global dset_i
pubsub_management = PubsubManagementServiceClient()
dataset_management = DatasetManagementServiceClient()
ingestion_management = IngestionManagementServiceClient()
param_contexts = create_params()
context_ids = [dataset_management.create_parameter_context(x.name, x.dump()) for x in param_contexts]
pdict_id = dataset_management.create_parameter_dictionary('poc_dict', context_ids, temporal_context='poc_TIME')
stream_def_id = pubsub_management.create_stream_definition('std', parameter_dictionary_id=pdict_id)
stream_id, route = pubsub_management.create_stream('instrument_stream_%s'%dset_i, 'xp1', stream_definition_id=stream_def_id)
ingest_config_id = ingestion_management.list_ingestion_configurations(id_only=True)[0]
tdom, sdom = time_series_domain()
dataset_id = dataset_management.create_dataset('instrument_dataset_%s'%dset_i, parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump())
ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id)
dataset_management.register_dataset(dataset_id)
dset_i +=1
return stream_id, route, stream_def_id, dataset_id
def start_stats_listener():
from pyon.event.event import EventSubscriber
def cb(*args, **kwargs):
print args[0].__dict__
es = EventSubscriber(event_type='IngestionStatus', callback=cb)
return es, es.start()
def populate_dataset(dataset_id, hours):
cov = DatasetManagementService._get_coverage(dataset_id)
dt = hours * 3600
cov.insert_timesteps(dt)
now = time.time()
cov.set_parameter_values('poc_TIME', np.arange(now - dt, now) + 2208988800)
cov.set_parameter_values('poc_LAT', 41.5)
cov.set_parameter_values('poc_LON', -71.5)
# SBE 37IM - temperature in 't_dec' example range between 280000 and 350000
cov.set_parameter_values('poc_TEMPWAT_L0', value=np.random.random_sample(dt)*(350000-280000)+280000)
# SBE 37IM - conductivity, ranging between 100000 & 750000 (not 0 because never 0 in seawater)
cov.set_parameter_values('poc_CONDWAT_L0', value=np.random.random_sample(dt)*(750000-100000)+100000)
# SBE 37IM - pressure, ranging between 2789 and 10000 (couldn't find a range in the DPS, this seems reasonable!)
cov.set_parameter_values('poc_PRESWAT_L0', value=np.random.random_sample(dt)*(10000-2789)+2789)
def load_data_product():
global dset_i
dataset_management = DatasetManagementServiceClient()
pubsub_management = PubsubManagementServiceClient()
data_product_management = DataProductManagementServiceClient()
resource_registry = Container.instance.resource_registry
tdom, sdom = time_series_domain()
tdom = tdom.dump()
sdom = sdom.dump()
dp_obj = DataProduct(
name='instrument_data_product_%i' % dset_i,
description='ctd stream test',
processing_level_code='Parsed_Canonical',
temporal_domain = tdom,
spatial_domain = sdom)
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id)
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id)
data_product_management.activate_data_product_persistence(data_product_id)
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True)
stream_id = stream_ids[0]
route = pubsub_management.read_stream_route(stream_id)
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True)
dataset_id = dataset_ids[0]
dset_i+=1
return data_product_id, stream_id, route, stream_def_id, dataset_id
def create_params():
contexts = []
t_ctxt = ParameterContext('poc_TIME', param_type=QuantityType(value_encoding=np.dtype('int64')))
t_ctxt.uom = 'seconds since 01-01-1900'
contexts.append(t_ctxt)
lat_ctxt = ParameterContext('poc_LAT', param_type=ConstantType(QuantityType(value_encoding=np.dtype('float32'))), fill_value=-9999)
lat_ctxt.axis = AxisTypeEnum.LAT
lat_ctxt.uom = 'degree_north'
contexts.append(lat_ctxt)
lon_ctxt = ParameterContext('poc_LON', param_type=ConstantType(QuantityType(value_encoding=np.dtype('float32'))), fill_value=-9999)
lon_ctxt.axis = AxisTypeEnum.LON
lon_ctxt.uom = 'degree_east'
contexts.append(lon_ctxt)
# Independent Parameters
# Temperature - values expected to be the decimal results of conversion from hex
temp_ctxt = ParameterContext('poc_TEMPWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999)
temp_ctxt.uom = 'deg_C'
contexts.append(temp_ctxt)
# Conductivity - values expected to be the decimal results of conversion from hex
cond_ctxt = ParameterContext('poc_CONDWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999)
cond_ctxt.uom = 'S m-1'
contexts.append(cond_ctxt)
# Pressure - values expected to be the decimal results of conversion from hex
press_ctxt = ParameterContext('poc_PRESWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999)
press_ctxt.uom = 'dbar'
contexts.append(press_ctxt)
# Dependent Parameters
# TEMPWAT_L1 = (TEMPWAT_L0 / 10000) - 10
tl1_func = '(T_L0 / 10000) - 10'
tl1_pmap = {'T_L0':'poc_TEMPWAT_L0'}
expr = NumexprExpression(tl1_func, tl1_pmap)
tempL1_ctxt = ParameterContext('poc_TEMPWAT_L1', param_type=ParameterFunctionType(expression=expr), variability=VariabilityEnum.TEMPORAL)
tempL1_ctxt.uom = 'deg_C'
contexts.append(tempL1_ctxt)
# CONDWAT_L1 = (CONDWAT_L0 / 100000) - 0.5
cl1_func = '(C_L0 / 100000) - 0.5'
cl1_pmap = {'C_L0':'poc_CONDWAT_L0'}
expr = NumexprExpression(cl1_func, cl1_pmap)
condL1_ctxt = ParameterContext('poc_CONDWAT_L1', param_type=ParameterFunctionType(expression=expr), variability=VariabilityEnum.TEMPORAL)
condL1_ctxt.uom = 'S m-1'
contexts.append(condL1_ctxt)
# Equation uses p_range, which is a calibration coefficient - Fixing to 679.34040721
# PRESWAT_L1 = (PRESWAT_L0 * p_range / (0.85 * 65536)) - (0.05 * p_range)
pl1_func = '(P_L0 * 679.34040721 / (0.85 * 65536)) - (0.05 * 679.34040721)'
pl1_pmap = {'P_L0':'poc_PRESWAT_L0'}
expr = NumexprExpression(pl1_func, pl1_pmap)
presL1_ctxt = ParameterContext('poc_PRESWAT_L1', param_type=ParameterFunctionType(expression=expr), variability=VariabilityEnum.TEMPORAL)
presL1_ctxt.uom = 'S m-1'
contexts.append(presL1_ctxt)
# Density & practical salinity calucluated using the Gibbs Seawater library - available via python-gsw project:
# https://code.google.com/p/python-gsw/ & http://pypi.python.org/pypi/gsw/3.0.1
# PRACSAL = gsw.SP_from_C((CONDWAT_L1 * 10), TEMPWAT_L1, PRESWAT_L1)
owner = 'gsw'
sal_func = 'SP_from_C'
sal_arglist = [NumexprExpression('C*10', {'C':'poc_CONDWAT_L1'}), 'poc_TEMPWAT_L1', 'poc_PRESWAT_L1']
sal_kwargmap = None
expr = PythonExpression(owner, sal_func, sal_arglist, sal_kwargmap)
sal_ctxt = ParameterContext('poc_PRACSAL', param_type=ParameterFunctionType(expr), variability=VariabilityEnum.TEMPORAL)
sal_ctxt.uom = 'g kg-1'
contexts.append(sal_ctxt)
# absolute_salinity = gsw.SA_from_SP(PRACSAL, PRESWAT_L1, longitude, latitude)
# conservative_temperature = gsw.CT_from_t(absolute_salinity, TEMPWAT_L1, PRESWAT_L1)
# DENSITY = gsw.rho(absolute_salinity, conservative_temperature, PRESWAT_L1)
owner = 'gsw'
abs_sal_expr = PythonExpression(owner, 'SA_from_SP', ['poc_PRACSAL', 'poc_PRESWAT_L1', 'poc_LON','poc_LAT'], None)
cons_temp_expr = PythonExpression(owner, 'CT_from_t', [abs_sal_expr, 'poc_TEMPWAT_L1', 'poc_PRESWAT_L1'], None)
dens_expr = PythonExpression(owner, 'rho', [abs_sal_expr, cons_temp_expr, 'poc_PRESWAT_L1'], None)
dens_ctxt = ParameterContext('poc_DENSITY', param_type=ParameterFunctionType(dens_expr), variability=VariabilityEnum.TEMPORAL)
dens_ctxt.uom = 'kg m-3'
contexts.append(dens_ctxt)
return contexts
from pyon.ion.stream import StandaloneStreamPublisher
from ion.services.dm.utility.granule import RecordDictionaryTool
import gevent
class Streamer(object):
def __init__(self, stream_id, route, stream_def_id, interval=1):
self.publisher = StandaloneStreamPublisher(stream_id, route)
self.i=0
self.interval = interval
self.stream_def_id = stream_def_id
self.g = gevent.spawn(self.run)
self.finished = False
def run(self):
while not self.finished:
gevent.sleep(self.interval)
rdt = RecordDictionaryTool(stream_definition_id=self.stream_def_id)
rdt['poc_TIME'] = time.time() + 2208988800
rdt['poc_LAT'] = 41.5
rdt['poc_LON'] = -71.5
rdt['poc_TEMPWAT_L0'] = np.random.random_sample(1)*(350000-280000)+280000
rdt['poc_CONDWAT_L0'] = np.random.random_sample(1)*(750000-100000)+100000
rdt['poc_PRESWAT_L0'] = np.random.random_sample(1)*(10000-2789)+2789
self.i += 1
self.publisher.publish(rdt.to_granule())
def close(self):
self.finished = True
self.g.join(5)
self.g.kill()
def quick_write(stream_id, route, stream_def_id, values=1000):
raise NotImplementedError("No!!")
# publisher = StandaloneStreamPublisher(stream_id, route)
# stream_def_id = stream_def_id
# rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
# rdt['time'] = np.arange(values)
# rdt['temp'] = np.arange(values)
#
# publisher.publish(rdt.to_granule())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment