Created
March 8, 2016 18:09
-
-
Save thearn/38fa31ab5a0712ebfcc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from openmdao.core.component import Component | |
from pycycle.constants import R_UNIVERSAL_ENG, MIN_VALID_CONCENTRATION | |
from openmdao.core.options import OptionsDictionary | |
from openmdao.components.indep_var_comp import IndepVarComp | |
from ad import adnumber | |
from ad.admath import log | |
class ChemEq(Component): | |
""" Find the equilibirum composition for a given gaseous mixture """ | |
def __init__(self, thermo, mode="T"): | |
super(ChemEq, self).__init__() | |
self.thermo = thermo | |
self.mode = mode | |
self.num_prod = num_prod = thermo.num_prod | |
self.num_element = num_element = thermo.num_element | |
self.fd_options['step_size'] = 1e-6 | |
self.fd_options['step_type'] = "absolute" | |
self.solver_options = OptionsDictionary() | |
self.solver_options.add_option('tol', 1e-10, desc="internal solver tolerance") | |
self.solver_options.add_option('maxiter', 150, desc="internal solver maximum iteration") | |
# Input vars | |
self.add_param('init_prod_amounts', val=thermo.init_prod_amounts, shape=(num_prod,), | |
desc="initial mass fractions of products, before equilibrating") | |
self.add_param('P', val=1.0, units="bar", desc="Pressure") | |
self.state_meta = {} | |
if mode=="T": | |
self.add_param('T', val=284., units="degK", desc="Temperature") | |
elif mode=="h" or mode=="S": | |
self.state_meta['T'] = {'log':True, 'idx':num_prod+num_element, 'under_relax':False} | |
self.add_output('T', val=284., units="degK", desc="Temperature", | |
**self.state_meta['T']) | |
else: | |
raise ValueError('Only "T", "h", or "S" are allowed values for mode') | |
if mode == "h": | |
self.add_param('h', val=0., units="cal/g", description="Enthalpy") | |
elif mode == "S": | |
self.add_param('S', val=0., units="cal/(g*degK)", description="Entropy") | |
# State vars | |
self.n_init = np.ones(num_prod) / num_prod / 10 # thermo.init_prod_amounts | |
self.state_meta['n'] = {'log':True, 'idx':slice(0,num_prod), 'under_relax':True} | |
self.add_output('n', val=self.n_init, shape=num_prod, | |
desc="mole fractions of the mixture", | |
**self.state_meta['n']) | |
self.state_meta['pi'] = {'log':False, 'idx':slice(num_prod,num_prod+num_element), 'under_relax':False} | |
self.add_output('pi', val=np.zeros(num_element), | |
desc="modified lagrange multipliers from the Gibbs lagrangian", | |
**self.state_meta['pi']) | |
# Outputs | |
self.add_output('b0', shape=num_element, # when converged, b0=b | |
desc='assigned kg-atoms of element i per total kg of reactant for the initial prod amounts') | |
self.add_output('n_moles', shape=1, desc="1/molecular weight of gas") | |
# allocate the newton Jacobian | |
self.size = size = num_prod + num_element | |
if mode != "T": | |
size += 1 # added T as a state variable | |
self._dRdy = np.zeros((size, size)) | |
self._R = np.zeros(size) | |
self.total_iters = 0 | |
self.DEBUG=False | |
def _apply_nonlinear(self, params, unknowns, resids): | |
""" | |
Total derivs = dF/dX - dF/dy[(dR/dy)^-1 (dR/dx)] | |
assumed ordering in all arrays: | |
params: | |
mode var (T, h, or S), P, init_prod_amounts | |
states: | |
n, pi, T (if mode is h or S) | |
outputs: | |
b0, n_moles, n, pi, T (if mode is h or S) | |
""" | |
thermo = self.thermo | |
n = unknowns['n'] | |
#n = abs(np.random.randn(n.size)) | |
pi = unknowns['pi'] | |
if self.mode != "T": | |
T = unknowns['T'] | |
else: | |
T = params['T'] | |
if self.mode == "h": | |
h = params['h'] | |
h = adnumber(h) | |
if self.mode == "S": | |
S = params['S'] | |
S = adnumber(S) | |
P = params['P'] + abs(np.random.randn(1)*10) | |
ipa = params['init_prod_amounts'] | |
ipa = adnumber(ipa) | |
T = adnumber(T) | |
P = adnumber(P) | |
n = adnumber(n) | |
pi = adnumber(pi) | |
n_moles_old = unknowns['n_moles'].copy() | |
n_moles = unknowns['n_moles'] = np.sum(n) | |
resids['n_moles'] = n_moles - n_moles_old | |
self.H0_T = H0_T = thermo.H0(T) | |
self.S0_T = S0_T = thermo.S0(T) | |
self.mu = H0_T - S0_T + log(n) + log(P) - log(n_moles) | |
# residuals from the gibbs energy minimization | |
resids_n = self.mu - np.sum(pi*thermo.aij.T, axis=1) | |
resids['n'] = resids_n | |
b0_old = unknowns['b0'].copy() | |
b0 = unknowns['b0'] = np.sum(thermo.aij*ipa, axis=1) # vectorization for speedup | |
resids_b0 = b0 - b0_old | |
resids['b0'] = resids_b0 | |
# residuals from the conservation of mass | |
resids_pi = np.sum(thermo.aij*n, axis=1) - b0 | |
resids['pi'] = resids_pi | |
if self.mode == "h": | |
self.sum_n_H0_T = np.sum(n*H0_T) | |
resids_t = h - self.sum_n_H0_T*R_UNIVERSAL_ENG*T | |
resids['T'] = resids_t | |
elif self.mode == "S": | |
resids_t = S- R_UNIVERSAL_ENG*np.sum(n*(S0_T-log(n)+log(n_moles)-log(P))) | |
resids['T'] = resids_t | |
deriv = {} | |
deriv['n', 'T'] = thermo.H0_applyJ(T, 1.0) - thermo.S0_applyJ(T, 1.0) | |
deriv['n', 'P'] = np.ones(self.num_prod)/P | |
deriv['n', 'ipa'] = np.zeros(self.num_prod) | |
#[i.gradient(ipa)[0] for i in resids_n] | |
deriv['pi', 'T'] = np.zeros(self.num_element) | |
deriv['pi', 'P'] = np.zeros(self.num_element) | |
deriv['pi', 'ipa'] = -thermo.aij | |
deriv['T', 'P'] = 0.0 | |
if self.mode == "S": | |
deriv['T', 'P'] = R_UNIVERSAL_ENG * 1/P * sum(n) | |
deriv['T', 'ipa'] = np.zeros(self.num_prod) | |
# VERIFY DR/DY | |
self._calc_dRdy(params, unknowns) | |
# # dn/dn | |
print "resid_n, n" | |
d1 = [i.gradient(n) for i in resids_n] | |
d2 = self._dRdy[:self.num_prod, :self.num_prod] | |
print np.linalg.norm(d1 - d2) | |
print "resid_n, pi" | |
d1 = [i.gradient(pi) for i in resids_n] | |
end_element = self.num_prod+self.num_element | |
d2 = self._dRdy[:self.num_prod, self.num_prod:end_element] | |
print np.linalg.norm(d1 - d2) | |
print "resid_pi, n" | |
d1 = [i.gradient(n) for i in resids_pi] | |
d2 = self._dRdy[self.num_prod:end_element, :self.num_prod] | |
print np.linalg.norm(d1 - d2) | |
print "resid_pi, pi" | |
d1 = [i.gradient(pi) for i in resids_pi] | |
d2 = self._dRdy[self.num_prod:end_element,self.num_prod:end_element] | |
print np.linalg.norm(d1 - d2) | |
print "resid_pi, P" | |
d1 = [i.gradient(P) for i in resids_pi] | |
print d1 | |
print deriv['pi', 'P'] | |
print "resids_n, P" | |
d1 = [i.gradient(P) for i in resids_n] | |
print d1 | |
print deriv['n', 'P'] | |
print "resids_t, P" | |
d1 = resids_t.gradient(P) | |
print d1 | |
print deriv['T', 'P'] | |
# # ------------- | |
if self.mode != "T": | |
print "resid_t, t" | |
d1 = resids_t.gradient(T) | |
d2 = self._dRdy[-1, -1] | |
print np.linalg.norm(d1 - d2) | |
print "resid_t, n" | |
d1 = resids_t.gradient(n) | |
d2 = self._dRdy[-1, :self.num_prod] | |
print np.linalg.norm(d1 - d2) | |
print "resid_t, pi" | |
d1 = resids_t.gradient(pi) | |
d2 = self._dRdy[-1, self.num_prod:end_element] | |
print np.linalg.norm(d1 - d2) | |
print "resid_n, t" | |
d1 = [i.gradient(T)[0] for i in resids_n] | |
d2 = self._dRdy[:self.num_prod, -1] | |
print np.linalg.norm(d1 - d2) | |
print "resid_pi, t" | |
d1 = [i.gradient(T) for i in resids_pi] | |
d2 = self._dRdy[self.num_prod:end_element, -1] | |
print np.linalg.norm(d1 - d2) | |
# ----------------- | |
#print d1 | |
# VERIFY DR/DX | |
inputs = [T, P, ipa] | |
states = [resids_n, resids_pi] | |
inpn = ["T", "P", "ipa"] | |
stn = ["n", "pi"] | |
from pprint import pprint | |
s_ = 0 | |
for s in states: | |
i_ = 0 | |
for i in inputs: | |
print inpn[i_], stn[s_] | |
if inpn[i_] == "T" and stn[s_] == "n": | |
d1 = [k.gradient(i)[0] for k in s] | |
else: | |
d1 = [k.gradient(i) for k in s] | |
d2 = deriv[stn[s_], inpn[i_]] | |
print np.linalg.norm(d1 - d2) | |
print "-----" | |
i_ +=1 | |
s_ +=1 | |
quit() | |
def _calc_dRdy(self, params, unknowns): | |
""" computes the jacobian for the newton solver """ | |
thermo = self.thermo | |
aij = thermo.aij | |
num_prod = self.num_prod | |
n = unknowns['n'] | |
dRdy = self._dRdy | |
# dRgibbs_dn | |
dRdy[:num_prod, :num_prod] = -1/np.sum(n) | |
np.fill_diagonal(dRdy[:num_prod, :num_prod], 1/n - 1/np.sum(n)) | |
mode = self.mode | |
if mode != "T": | |
# dRgibbs_dT | |
T = unknowns['T'] | |
self.dH0_dT = thermo.H0_applyJ(T, 1) | |
self.dS0_dT = thermo.S0_applyJ(T, 1) | |
dRdy[:num_prod,-1] = self.dH0_dT - self.dS0_dT | |
# dRmass_dT = 0 | |
if mode == "h": | |
# dRT_dn | |
dRdy[-1,:num_prod] = - R_UNIVERSAL_ENG*T*self.H0_T | |
# dRT_dT | |
dRdy[-1,-1] = -R_UNIVERSAL_ENG*(T*np.sum(n*self.dH0_dT) + self.sum_n_H0_T) | |
if mode == "S": | |
P = params['P'] | |
n_moles = unknowns['n_moles'] | |
# dRT_dn = 0 | |
dRdy[-1,:num_prod] = -R_UNIVERSAL_ENG*(self.S0_T - np.log(n)+np.log(n_moles)-np.log(P)) | |
# dRT_dT = 0 | |
#uc*(S0_T + np.log(sum_nj) - np.log(P) - np.log(nj)) | |
valid_products = n > MIN_VALID_CONCENTRATION | |
dRdy[-1,-1] = -R_UNIVERSAL_ENG*np.sum(n[valid_products]*self.dS0_dT[valid_products]) | |
end_element = num_prod+self.num_element | |
# dRgibbs_dpi | |
dRdy[:num_prod, num_prod:end_element] = -aij.T | |
# dRmass_dn | |
dRdy[num_prod:end_element, :self.num_prod] = aij | |
return dRdy | |
def _build_R_vec(self, resids): | |
# assemble the residual vector for the solver | |
R = self._R | |
for s_var, s_meta in self.state_meta.items(): | |
R[s_meta['idx']] = resids[s_var] | |
return R | |
def solve_nonlinear(self, params, unknowns, resids): | |
local_resids = resids.vec.copy() | |
R_gibbs = resids['n'] | |
# R_mass = resids['pi'] | |
n = unknowns['n'] | |
# n[n<=0] = 1e-5 # hack! | |
self._apply_nonlinear(params, unknowns, resids) | |
R = self._build_R_vec(resids) | |
err = np.linalg.norm(R) | |
if err > 1e-1: #sometimes the last converged point is not a good guess | |
n = unknowns['n'] = self.n_init | |
self._apply_nonlinear(params, unknowns, resids) | |
R = self._build_R_vec(resids) | |
err = np.linalg.norm(R) | |
self.iter_count = 1 | |
tol = self.solver_options['tol'] | |
maxiter = self.solver_options['maxiter'] | |
while err > tol and self.iter_count < maxiter: | |
if self.DEBUG: | |
print(self.iter_count, R, err) | |
# raw_input() | |
self._calc_dRdy(params, unknowns) | |
delta_y = np.linalg.solve(self._dRdy, -R) | |
valid_products = n > MIN_VALID_CONCENTRATION | |
R_max = abs(5*R_gibbs[-1]) | |
if np.any(valid_products): | |
R_max = max(R_max, np.max(np.abs(R_gibbs[valid_products]))) | |
# lambdaf is an under-relaxation factor that will be 1 except in very unconverged states | |
lambdaf = 2 / (R_max+1e-20) | |
if (lambdaf > 1): | |
lambdaf = 1 | |
for s_var,s_meta in self.state_meta.items(): | |
idx = s_meta['idx'] | |
if s_meta['log']: | |
step = delta_y[idx]/unknowns[s_var] | |
if np.isscalar(step): | |
if step>1e4: | |
step = 1e4 | |
elif step<-1e4: | |
step = -1e4 | |
else: | |
step[step>1e4] = 1e4 | |
step[step<-1e4] = -1e4 | |
if s_meta['under_relax']: | |
delta_s = np.exp(lambdaf*step) | |
else: | |
delta_s = np.exp(step) | |
unknowns[s_var] *= delta_s | |
else: | |
delta_s = delta_y[idx] | |
unknowns[s_var] += delta_s | |
self._apply_nonlinear(params, unknowns, resids) | |
R = self._build_R_vec(resids) | |
err = np.linalg.norm(R) | |
self.iter_count += 1 | |
resids.vec[:] = local_resids | |
# print "finished chem_eq solve", self.iter_count, err | |
def jacobian(self, params, unknowns, resids): | |
dRdy = self._dRdy | |
num_prod = self.num_prod | |
num_element = self.num_element | |
thermo = self.thermo | |
nb0 = num_element | |
nn = num_prod | |
npi = num_element | |
ninit = num_prod | |
# MODE = T | |
if self.mode == "T": | |
nF = num_prod + 2*num_element + 1 | |
ny = num_prod + num_element | |
else: | |
nF = num_prod + 2*num_element + 2 | |
ny = num_prod + num_element + 1 | |
nx = num_prod + 2 | |
""" | |
Total derivs = dF/dX - dF/dy[(dR/dy)^-1 (dR/dx)] | |
assumed ordering in all arrays: | |
params: | |
mode var (T, h, or S), P, init_prod_amounts | |
states: | |
n, pi, T (if mode is h or S) | |
outputs: | |
b0, n_moles, n, pi, T (if mode is h or S) | |
""" | |
# param index locations for dFdx, dydx, and total deriv j | |
mode_idx = 0 | |
P_idx = 1 | |
ipa_idx = slice(2, 2 + ninit) | |
# state index locations (not used here, just for reference) | |
# but is consistent with dRdy | |
state_n_idx = slice(0, nn) | |
state_pi_idx = slice(state_n_idx.stop, state_n_idx.stop + npi) | |
state_T_idx = state_pi_idx.stop | |
# output index locations for dFdx, dFdy, and total deriv j | |
b0_idx = slice(0, num_element) | |
n_moles_idx = b0_idx.stop | |
n_idx = slice(n_moles_idx, n_moles_idx + nn) | |
pi_idx = slice(n_idx.stop, n_idx.stop + npi) | |
t_idx = pi_idx.stop | |
## output wrt input | |
dFdx = np.zeros((nF, nx)) | |
# db0/dinitprod | |
dFdx[b0_idx, ipa_idx] = self.thermo.aij | |
# dpi/d_initprod | |
dFdx[pi_idx, ipa_idx] = -self.thermo.aij | |
if self.mode == "T": | |
T = params['T'] | |
# dn/dt | |
dFdx[n_idx, mode_idx] = thermo.H0_applyJ(T, 1) - thermo.S0_applyJ(T, 1) | |
else: | |
# dT/ds or dT/dh | |
dFdx[t_idx, mode_idx] = 1 | |
## output wrt states | |
dFdy = np.zeros((nF, ny)) | |
# dnmoles/dn | |
dFdy[n_moles_idx, :num_prod] = 1 | |
# dn/dn | |
dFdy[n_idx, : num_prod] = 1 | |
# dpi/dpi | |
dFdy[pi_idx, num_prod:] = 1 | |
## residuals wrt inputs | |
dRdx = np.zeros((ny, nx)) | |
dRdx[:nx,:] = np.eye(nx) | |
## states wrt inputs (matrix-matrix system) | |
dydx = np.linalg.solve(dRdy, dRdx) | |
## total derivs | |
j = dFdx - dFdy.dot(dydx) | |
J = {} | |
J['b0', self.mode] = j[b0_idx, mode_idx].reshape((nb0,1)) | |
J['b0', 'P'] = j[b0_idx, P_idx].reshape((nb0,1)) | |
J['b0', 'init_prod_amounts'] = j[b0_idx, ipa_idx].reshape((nb0,ninit)) | |
J['n_moles', self.mode] = j[n_moles_idx, mode_idx].reshape((1,1)) | |
J['n_moles', 'P'] = j[n_moles_idx, P_idx].reshape((1,1)) | |
J['n_moles', 'init_prod_amounts'] = j[n_moles_idx, ipa_idx].reshape((1,ninit)) | |
J['n', self.mode] = j[n_idx, mode_idx].reshape((nn,1)) | |
J['n', 'P'] = j[n_idx, P_idx].reshape((nn,1)) | |
J['n', 'init_prod_amounts'] = j[n_idx, ipa_idx].reshape((nn,ninit)) | |
J['pi', self.mode] = j[pi_idx, mode_idx].reshape((npi,1)) | |
J['pi', 'P'] = j[pi_idx, P_idx].reshape((npi,1)) | |
J['pi', 'init_prod_amounts'] = j[pi_idx, ipa_idx].reshape((npi,ninit)) | |
if self.mode == "S" or self.mode == "h": | |
J['T', self.mode] = j[t_idx, mode_idx].reshape((1,1)) | |
J['T', 'P'] = j[t_idx, P_idx].reshape((1,1)) | |
J['T', 'init_prod_amounts'] = j[t_idx, ipa_idx].reshape((1,ninit)) | |
print J['T', 'P'] | |
return J | |
if __name__ == "__main__": | |
from openmdao.api import Problem, Group | |
from pycycle import species_data | |
thermo = species_data.Thermo(species_data.janaf, {'H': 1, 'CO2': 1, 'Ar': 1, 'O': 1, 'N': 1}) | |
ipa= np.array([ 3.15433423e-04, 0.00000000e+00, 8.69292039e-04, | |
0.00000000e+00, 1.07446088e-05, 3.82488497e-06, | |
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, | |
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, | |
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, | |
2.63003773e-02, 0.00000000e+00, 0.00000000e+00, | |
7.05560402e-03]) | |
P= 31.026407819265 | |
h= 106.9708225318558 | |
T= 1626.1565398377627 | |
def make(): | |
prob = Problem() | |
prob.root = Group() | |
prob.root.add('chemeq', ChemEq(thermo, mode="h"), promotes=["*"]) | |
params = ( | |
('P', P, {'units':'psi'}), | |
('h', h, {'units':'cal/g'}), | |
('init_prod_amounts', ipa), | |
) | |
prob.root.add('des_vars', IndepVarComp(params), promotes=["*"]) | |
prob.setup(check=False) | |
return prob | |
prob = make() | |
prob.run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment