Last active
July 23, 2020 22:05
-
-
Save bbrelje/f8a16ee42000f385973b85dd45ed6107 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import openmdao.api as om | |
import numpy as np | |
from openconcept.utilities.math.integrals import NewIntegrator | |
import warnings | |
# OpenConcept PhaseGroup will be used to hold analysis phases with time integration | |
# =============== These will go in OpenConcept Core ===============# | |
def find_integrators_in_model(system, abs_namespace, timevars, states): | |
durationvar = system._problem_meta['oc_time_var'] | |
# check if we are a group or not | |
if isinstance(system, om.Group): | |
for subsys in system._subsystems_allprocs: | |
if not abs_namespace: | |
next_namespace = subsys.name | |
else: | |
next_namespace = abs_namespace + '.' + subsys.name | |
find_integrators_in_model(subsys, next_namespace, timevars, states) | |
else: | |
# if the duration variable shows up we need to add its absolute path to timevars | |
if isinstance(system, NewIntegrator): | |
for varname in system._var_rel_names['input']: | |
if varname == durationvar: | |
timevars.append(abs_namespace + '.' + varname) | |
for state in system._state_vars.keys(): | |
states.append(abs_namespace + '.' + state) | |
class PhaseGroup(om.Group): | |
def __init__(self, **kwargs): | |
num_nodes = kwargs.get('num_nodes', 11) | |
super(PhaseGroup, self).__init__(**kwargs) | |
self._oc_time_var_name = 'duration' | |
self._oc_num_nodes = num_nodes | |
def _setup_procs(self, pathname, comm, mode, prob_meta): | |
# need to pass down the name of the duration variable via prob_meta | |
prob_meta.update({'oc_time_var': self._oc_time_var_name}) | |
prob_meta.update({'oc_num_nodes': self._oc_num_nodes}) | |
super(PhaseGroup, self)._setup_procs(pathname, comm, mode, prob_meta) | |
def _configure(self): | |
super(PhaseGroup, self)._configure() | |
# check child subsys for variables to be integrated and add them all | |
timevars = [] | |
states = [] | |
find_integrators_in_model(self, '', timevars, states) | |
# make connections from duration to integrated vars automatically | |
for var_abs_address in timevars: | |
self.connect(self._oc_time_var_name, var_abs_address) | |
self._oc_states_list = states | |
class IntegratorGroup(om.Group): | |
def _setup_procs(self, pathname, comm, mode, prob_meta): | |
# TODO allow non-second duration units | |
num_nodes = prob_meta['oc_num_nodes'] | |
self.add_subsystem('ode_integ', NewIntegrator(time_setup='duration', diff_units='s', num_nodes=num_nodes)) | |
super(IntegratorGroup, self)._setup_procs(pathname, comm, mode, prob_meta) | |
def _configure(self): | |
super(IntegratorGroup, self)._configure() | |
for subsys in self._subsystems_allprocs: | |
for var in subsys._var_rel_names['output']: | |
# check if there are any variables to integrate | |
tags = subsys._var_rel2meta[var]['tags'] | |
if 'integrate' in tags: | |
state_name = None | |
state_units = None | |
state_val = 0.0 | |
state_lower = -1e30 | |
state_upper = 1e30 | |
state_promotes = False | |
for tag in tags: | |
split_tag = tag.split(':') | |
if split_tag[0] == 'state_name': | |
state_name = split_tag[-1] | |
elif split_tag[0] == 'state_units': | |
state_units = split_tag[-1] | |
elif split_tag[0] == 'state_val': | |
state_val = eval(split_tag[-1]) | |
elif split_tag[0] == 'state_lower': | |
state_lower = float(split_tag[-1]) | |
elif split_tag[0] == 'state_upper': | |
state_upper = float(split_tag[-1]) | |
elif split_tag[0] == 'state_promotes': | |
state_promotes = eval(split_tag[-1]) | |
if state_name is None: | |
raise ValueError('Must provide a state_name tag for integrated variable '+subsys.name+'.'+var) | |
if state_units is None: | |
warnings.warn('OpenConcept integration variable '+subsys.name+'.'+var+' '+'has no units specified. This can be dangerous.') | |
self.ode_integ.add_integrand(state_name, rate_name=var, val=state_val, | |
units=state_units, lower=state_lower, upper=state_upper) | |
# make the rate connection | |
self.connect(subsys.name+'.'+var, 'ode_integ'+'.'+var) | |
if state_promotes: | |
self.ode_integ._var_promotes['output'].append(state_name) | |
class Trajectory(om.Group): | |
def __init__(self, **kwargs): | |
super(Trajectory, self).__init__(**kwargs) | |
self._oc_phases_to_link = [] | |
def _configure(self): | |
super(Trajectory, self)._configure() | |
for linkage in self._oc_phases_to_link: | |
self._link_phases(linkage[0], linkage[1], linkage[2]) | |
def _link_phases(self, phase1, phase2, states_to_skip=None): | |
# find all the states in each phase | |
# if they appear in both phase1 and phase2, connect them | |
# unless the state is in states_to_skip | |
# if they do not appear in both, do nothing or maybe raise an error message | |
# print a report of states linked | |
phase1_states = phase1._oc_states_list | |
phase2_states = phase2._oc_states_list | |
for state in phase1_states: | |
if state in phase2_states: | |
state_phase1_abs_name = phase1.name + '.' + state + '_final' | |
state_phase2_abs_name = phase2.name + '.' + state + '_initial' | |
self.connect(state_phase1_abs_name, state_phase2_abs_name) | |
def link_phases(self, phase1, phase2, states_to_skip=None): | |
# need to cache this because the data we need isn't ready yet | |
self._oc_phases_to_link.append((phase1, phase2, states_to_skip)) | |
# --------------------- This is just an example | |
class NewtonSecondLaw(om.ExplicitComponent): | |
"A regular OpenMDAO component" | |
def initialize(self): | |
self.options.declare('num_nodes', default=1) | |
def setup(self): | |
num_nodes = self.options['num_nodes'] | |
self.add_input('mass', val=2.0*np.ones((num_nodes,)), units='kg') | |
self.add_input('force', val=1.0*np.ones((num_nodes,)), units='N') | |
# mark the output variable for integration using openmdao tags | |
self.add_output('accel', val=0.5*np.ones((num_nodes,)), units='m/s**2', tags=['integrate', | |
'state_name:velocity', | |
'state_units:m/s', | |
'state_val:5.0', | |
'state_promotes:True']) | |
self.declare_partials(['*'], ['*'], method='cs') | |
def compute(self, inputs, outputs): | |
outputs['accel'] = inputs['force'] / inputs['mass'] | |
class DragEquation(om.ExplicitComponent): | |
"Another regular OpenMDAO component that happens to take a state variable as input" | |
def initialize(self): | |
self.options.declare('num_nodes', default=1) | |
def setup(self): | |
num_nodes = self.options['num_nodes'] | |
self.add_input('velocity', val=0.0*np.ones((num_nodes,)), units='m/s') | |
self.add_output('force', val=0.0*np.ones((num_nodes,)), units='N') | |
self.declare_partials(['*'], ['*'], method='cs') | |
def compute(self, inputs, outputs): | |
outputs['force'] = -0.10 * inputs['velocity'] ** 2 | |
class VehicleModel(IntegratorGroup): | |
""" | |
A user wishing to integrate an ODE rate will need to subclass | |
this IntegratorGroup instead of the default OpenMDAO Group | |
but it behaves in all other ways exactly the same. | |
You can now incorporate this Vehicle Model in your model tree | |
using the regular Group. Only the direct parent of the rate | |
to be integrated has to use this special class. | |
""" | |
def initialize(self): | |
self.options.declare('num_nodes', default=11) | |
def setup(self): | |
num_nodes = self.options['num_nodes'] | |
self.add_subsystem('nsl', NewtonSecondLaw(num_nodes=num_nodes)) | |
self.add_subsystem('drag', DragEquation(num_nodes=num_nodes)) | |
# velocity output is created automatically by the integrator | |
# if you want you can promote it, or you can connect it directly as here | |
self.connect('velocity', 'drag.velocity') | |
self.connect('drag.force','nsl.force') | |
class MyPhase(PhaseGroup): | |
"An OpenConcept Phase comprises part of a time-based trajectory and always needs to have a 'duration' defined" | |
def setup(self): | |
self.add_subsystem('ivc', om.IndepVarComp('duration', val=5.0, units='s'), promotes_outputs=['duration']) | |
self.add_subsystem('vm', VehicleModel()) | |
class MyTraj(Trajectory): | |
"An OpenConcept Trajectory consists of one or more phases that may be linked together. This will often be a top-level model" | |
def setup(self): | |
self.add_subsystem('phase1', MyPhase()) | |
self.add_subsystem('phase2', MyPhase()) | |
# the link_phases directive ensures continuity of state variables across phase boundaries | |
self.link_phases(self.phase1, self.phase2) | |
if __name__ == "__main__": | |
prob = om.Problem(MyTraj()) | |
prob.model.nonlinear_solver = om.NewtonSolver(iprint=2) | |
prob.model.linear_solver = om.DirectSolver() | |
prob.model.nonlinear_solver.options['solve_subsystems'] = True | |
prob.model.nonlinear_solver.options['maxiter'] = 20 | |
prob.model.nonlinear_solver.options['atol'] = 1e-6 | |
prob.model.nonlinear_solver.options['rtol'] = 1e-6 | |
prob.setup() | |
# set the initial value of the state at the beginning of the trajectory | |
prob['phase1.vm.ode_integ.velocity_initial'] = 10.0 | |
prob.run_model() | |
prob.model.list_outputs(print_arrays=True) | |
prob.model.list_inputs(print_arrays=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment