Created
March 18, 2015 09:18
-
-
Save mraspaud/36244cf58bcc1eb06ea8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright (c) 2014, 2015 Martin Raspaud | |
# Author(s): | |
# Martin Raspaud <[email protected]> | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
"""Test a new approach to the producer. | |
""" | |
from trollduction.xml_read import ProductList | |
import Queue | |
from mpop.utils import debug_on | |
debug_on() | |
def process_pl(elt, info): | |
flat_list = [] | |
for item in elt: | |
new_info = info.copy() | |
for key, val in item.attrib.items(): | |
if key in ["id", "name"]: | |
key = item.tag + key | |
new_info[key] = val | |
if item.text.strip(): | |
return item.text.strip(), new_info | |
else: | |
flat_list.append(process_pl(item, new_info)) | |
return flat_list | |
fname = "/home/a001673/usr/src/pytroll-config/etc/polar_product_list.xml" | |
from mpop.satellites import GenericFactory as GF | |
import logging | |
from trollsched.satpass import Pass | |
LOGGER = logging.getLogger(__name__) | |
def create_scene_from_message(msg): | |
"""Parse the message *msg* and return a corresponding MPOP scene. | |
""" | |
if msg.type in ["file", 'collection', 'dataset']: | |
return create_scene_from_mda(msg.data) | |
def create_scene_from_mda(mda): | |
"""Read the metadata *mda* and return a corresponding MPOP scene. | |
""" | |
time_slot = (mda.get('start_time') or | |
mda.get('nominal_time') or | |
mda.get('end_time')) | |
# orbit is not given for GEO satellites, use None | |
if 'orbit_number' not in mda: | |
mda['orbit_number'] = None | |
platform = mda["platform_name"] | |
LOGGER.info("platform %s time %s", | |
str(platform), str(time_slot)) | |
if isinstance(mda['sensor'], (list, tuple, set)): | |
sensor = mda['sensor'][0] | |
else: | |
sensor = mda['sensor'] | |
# Create satellite scene | |
global_data = GF.create_scene(satname=str(platform), | |
satnumber='', | |
instrument=str(sensor), | |
time_slot=time_slot, | |
orbit=mda['orbit_number'], | |
variant=mda.get('variant', '')) | |
LOGGER.debug("Creating scene for satellite %s", str(platform)) | |
if mda['orbit_number'] is not None or mda.get('orbit_type') == "polar": | |
global_data.overpass = Pass(platform, | |
mda['start_time'], | |
mda['end_time'], | |
instrument=sensor) | |
# Update missing information to global_data.info{} | |
# TODO: this should be fixed in mpop. | |
global_data.info.update(mda) | |
global_data.info['time'] = time_slot | |
return global_data | |
class AbstractProcessor(object): | |
def _process_children(self, stuff): | |
for child in self.data: | |
child.process(stuff) | |
def required_channels(self, scene): | |
req_chans = set() | |
for child in self.data: | |
req_chans |= child.required_channels(scene) | |
return req_chans | |
class PLProcessor(AbstractProcessor): | |
def __init__(self, pl): | |
self.info = pl.attrib | |
self.data = [GroupProcessor(grp, self.info) for grp in pl.groups] | |
def process(self, msg): | |
self._process_children(create_scene_from_message(msg)) | |
class GroupProcessor(AbstractProcessor): | |
def __init__(self, group, params): | |
self.info = params.copy() | |
self.info.update(group.info) | |
self.info["groupid"] = self.info["id"] | |
del self.info["id"] | |
if "name" in self.info: | |
self.info["groupname"] = self.info["name"] | |
del self.info["name"] | |
self.data = [processors.get(ch.tag, Processor)(ch, self.info) | |
for ch in group.data] | |
def get_def_names(self): | |
return [area.info["areaid"] for area in self.data] | |
def process(self, global_data): | |
# check coverage | |
# load data | |
req_channels = self.required_channels(global_data) | |
global_data.load(req_channels, area_def_names=self.get_def_names()) | |
self._process_children(global_data) | |
if self.info.get("unload", False): | |
loaded_channels = [chn.name for chn | |
in global_data.loaded_channels()] | |
global_data.unload(*loaded_channels) | |
LOGGER.debug("unloading all channels after group %s", | |
self.info["groupid"]) | |
class Processor(AbstractProcessor): | |
def __init__(self, elt, params): | |
self.info = params.copy() | |
self.info.update(elt.attrib) | |
for key in ["id", "name"]: | |
if key in self.info: | |
self.info[elt.tag + key] = self.info[key] | |
del self.info[key] | |
self.data = [processors.get(ch.tag, Processor)(ch, self.info) | |
for ch in elt] | |
if not self.data: | |
self.data = elt.text | |
class FileProcessor(Processor): | |
def process(self, img): | |
# todo: don't do overlay here, it could be done once for all... | |
if "overlay" in self.info: | |
img.add_overlay_from_config(self.info["overlay"]) | |
# todo: link, trollsift compose | |
from trollsift import compose | |
img.save(compose(self.data, self.info)) | |
class AreaProcessor(Processor): | |
def process(self, global_data): | |
local_data = global_data.project(self.info["areaid"], | |
self.required_channels(global_data)) | |
self._process_children(local_data) | |
class ProdProcessor(Processor): | |
queue = Queue.Queue() | |
def process(self, scene): | |
fun = getattr(scene.image, self.info["productid"]) | |
img = fun() | |
self._process_children(img) | |
def required_channels(self, scene): | |
try: | |
composite = getattr(scene.image, self.info["productid"]) | |
except AttributeError: | |
LOGGER.debug("Composite %s not available", | |
self.info['productid']) | |
return set() | |
else: | |
return composite.prerequisites | |
processors = { | |
"area": AreaProcessor, | |
"product": ProdProcessor, | |
"file": FileProcessor, | |
} | |
from mock import patch, MagicMock | |
import mpop | |
@patch('mpop.scene.SatelliteInstrumentScene.load') | |
@patch('mpop.scene.SatelliteInstrumentScene.project') | |
@patch('trollsift.compose') | |
def test(new_compose, new_project, new_load): | |
new_load.return_value = mpop.scene.SatelliteInstrumentScene() | |
new_project.return_value = mpop.scene.SatelliteInstrumentScene() | |
new_project.return_value.image = MagicMock() | |
new_load.return_value.image = MagicMock() | |
new_compose = (lambda x, y: x) | |
pl = ProductList(fname) | |
proc = PLProcessor(pl) | |
from posttroll.message import Message | |
message = Message( | |
rawstr="""pytroll://AAPP-HRPT/1b/norrköping/utv/polar/direct_readout/ file [email protected] 2015-03-15T09:51:30.793830 v1.01 application/json {"uid": "hrpt_noaa15_20150315_0942_87549.l1b", "format": "AAPP-HRPT", "type": "binary", "start_time": "2015-03-15T09:42:28", "orbit_number": 87548, "uri": "ssh://safe.smhi.se//san1/pps/import/PPS_data/source/noaa15_20150315_0942_87549/hrpt_noaa15_20150315_0942_87549.l1b", "platform_name": "NOAA-15", "end_time": "2015-03-15T09:51:16", "sensor": "avhrr/3", "data_processing_level": "1b"}""") | |
proc.process(message) | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment