Skip to content

Instantly share code, notes, and snippets.

@mbaudin47
Last active December 27, 2024 16:36
Show Gist options
  • Select an option

  • Save mbaudin47/6838346d014a3da26898d8e6aa7e1c2b to your computer and use it in GitHub Desktop.

Select an option

Save mbaudin47/6838346d014a3da26898d8e6aa7e1c2b to your computer and use it in GitHub Desktop.
Export an OpenTURNS PCE as a standalone Python code
"""
Create a polynomial chaos for the Ishigami function and export
it as a standalone Python code.
See https://github.com/openturns/openturns/blob/master/lib/src/Uncertainty/Algorithm/MetaModel/FunctionalChaos/FunctionalChaosResult.cxx
This code produces a file which content is:
from openturns import *
def pceModel(X):
# Define the distribution
marginals = []
marginals.append(Uniform(-3.141592653589793, 3.141592653589793))
marginals.append(Uniform(-3.141592653589793, 3.141592653589793))
marginals.append(Uniform(-3.141592653589793, 3.141592653589793))
distribution = JointDistribution(marginals)
# Set the indices
indices = [0,1,6,7,9,10,15,30,32,34,35,40,49,51,65,77,83,89,98,109,119,121,125,128,156,163]#26
# Define the coefficients
coefficients = Sample([[3.5080413504746915], [1.6143454720819665], [-0.011977451617001271], [-0.5853961996906742], [-0.016533164114624046], [-1.2849614959577127], [1.3687745585574793], [-1.9578673552520318], [0.007818143465962605], [-0.013988939678967135], [0.18670006158189084], [-1.0708624964041402], [0.39138426252265074], [-0.01365068249265241], [0.0037816035001760153], [1.3343816320512953], [-0.017770803638288542], [0.16145682554646218], [-0.33151563222883595], [-0.022312021201981218], [0.0010618753230878404], [-0.026372655726279715], [-0.01805834859738003], [0.008337136150391757], [-0.3629216338749513], [-0.025780331326596802]])
# Set the basis
inputDimension = distribution.getDimension()
polynomials = PolynomialFamilyCollection(inputDimension)
for i in range(inputDimension):
marginalPolynomial = StandardDistributionPolynomialFactory(marginals[i])
polynomials[i] = marginalPolynomial
enumerate = LinearEnumerateFunction(inputDimension)
basis = OrthogonalProductPolynomialFactory(polynomials, enumerate)
# Set the function collection
function_collection = []
numberOfIndices = len(indices)
for i in range(numberOfIndices):
function_collection.append(basis.build(indices[i]))
# Set the composed metamodel, set the transformation, create the metamodel
composedMetaModel = DualLinearCombinationFunction(function_collection, coefficients)
measure = basis.getMeasure()
transformation = DistributionTransformation(distribution, measure)
metaModel = ComposedFunction(composedMetaModel, transformation)
# Evaluate the metamodel
Y = metaModel(X)
return Y
"""
# %%
# Define the model
# ----------------
# %%
from openturns.usecases import ishigami_function
import openturns as ot
# %%
# We load the Ishigami model :
im = ishigami_function.IshigamiModel()
# %%
sampleSize = 100
inputTrain = im.inputDistribution.getSample(sampleSize)
outputTrain = im.model(inputTrain)
# %%
multivariateBasis = ot.OrthogonalProductPolynomialFactory([im.X1, im.X2, im.X3])
multivariateBasis
# %%
# Then we create the sparse polynomial chaos expansion using regression and
# the LARS selection model method.
# %%
selectionAlgorithm = ot.LeastSquaresMetaModelSelectionFactory()
projectionStrategy = ot.LeastSquaresStrategy(selectionAlgorithm)
totalDegree = 8
enumerateFunction = multivariateBasis.getEnumerateFunction()
basisSize = enumerateFunction.getBasisSizeFromTotalDegree(totalDegree)
adaptiveStrategy = ot.FixedStrategy(multivariateBasis, basisSize)
chaosAlgo = ot.FunctionalChaosAlgorithm(
inputTrain, outputTrain, im.inputDistribution, adaptiveStrategy, projectionStrategy
)
chaosAlgo.run()
chaosResult = chaosAlgo.getResult()
chaosResult
# %%
# Get the metamodel.
metamodel = chaosResult.getMetaModel()
print(metamodel)
# %%
class SuperPoint():
def __init__(self, point):
self.point = point
def toPython(self, variableName="", indentation=""):
dimension = self.point.getDimension()
code = f"{indentation}"
if variableName != "":
code += f"{variableName} = "
code += "Point(["
for i in range(dimension):
code += f"{self.point[i]}"
if i < dimension - 1:
code += ", "
code += "])"
return code
# %%
point = im.inputDistribution.getRealization()
superPoint = SuperPoint(point)
print("Point. To Python:")
print(superPoint.toPython())
print(superPoint.toPython("p"))
print(superPoint.toPython("p", " "))
# %%
class SuperSample():
def __init__(self, sample):
self.sample = sample
def toPython(self, variableName="", indentation=""):
code = f"{indentation}"
if variableName != "":
code += f"{variableName} = "
size = self.sample.getSize()
dimension = self.sample.getDimension()
code += "Sample(["
for i in range(size):
code += "["
for j in range(dimension):
code += f"{self.sample[i, j]}"
if j < dimension - 1:
code += ", "
code += "]"
if i < size - 1:
code += ", "
code += "])"
return code
# %%
superInputSample = SuperSample(inputTrain)
print("Sample. To Python:")
print(superInputSample.toPython())
print(superInputSample.toPython("s"))
print(superInputSample.toPython("s", " "))
# %%
class SuperDistribution():
def __init__(self, distribution):
self.distribution = distribution
def toPython(self, variableName="", indentation=""):
dimension = self.distribution.getDimension()
if dimension == 1:
code = self._toPython1d(variableName, indentation)
else:
code = self._toPythonNd(variableName, indentation)
return code
def _toPython1d(self, variableName="", indentation=""):
code = f"{indentation}"
if variableName != "":
code += f"{variableName} = "
className = self.distribution.getImplementation().getClassName()
parameter = self.distribution.getParameter()
code += f"{className}("
# Do not use SuperPoint.toPython() directly, because it involves
# an unwanted conversion to Point and unwanted square brackets
paramDimension = parameter.getDimension()
for i in range(paramDimension):
code += f"{parameter[i]}"
if i < paramDimension - 1:
code += ", "
code += ")"
return code
def _toPythonNd(self, variableName="", indentation=""):
copula = self.distribution.getCopula()
copulaName = copula.getImplementation().getClassName()
if copulaName != "IndependentCopula":
raise ValueError(f"Non independent copula {copulaName} is not implemented yet")
className = self.distribution.getClassName()
if className == "Distribution":
className = self.distribution.getImplementation().getClassName()
if className == "JointDistribution":
code = f"{indentation}marginals = []\n"
dimension = self.distribution.getDimension()
for i in range(dimension):
marginal = self.distribution.getMarginal(i)
superMarginal = SuperDistribution(marginal)
code += f"{indentation}marginals.append({superMarginal.toPython()})\n"
if variableName != "":
code += f"{indentation}{variableName} = JointDistribution(marginals)\n"
else:
code += f"{indentation}JointDistribution(marginals)\n"
else:
raise ValueError(f"Distribution {className} is not implemented yet")
return code
# %%
superMarginalDistribution = SuperDistribution(im.inputDistribution.getMarginal(0))
print("To Python:")
print(superMarginalDistribution.toPython())
print(superMarginalDistribution.toPython("d"))
print(superMarginalDistribution.toPython("d", " "))
# %%
superDistribution = SuperDistribution(im.inputDistribution)
print("To Python:")
print(superDistribution.toPython())
print(superDistribution.toPython("d"))
print(superDistribution.toPython("d", " "))
# %%
distribution = chaosResult.getDistribution()
superDistribution = SuperDistribution(distribution)
print("To Python:")
print(superDistribution.toPython("d"))
# %%
class SuperChaosResult():
def __init__(self, functionalChaosResult):
self.functionalChaosResult = functionalChaosResult
def toPython(self):
coefficients = self.functionalChaosResult.getCoefficients()
code = ""
code = "from openturns import *\n"
code += "def pceModel(X):\n"
code += " # Define the distribution\n"
# Serialize the input distribution
distribution = self.functionalChaosResult.getDistribution()
superDistribution = SuperDistribution(distribution)
distributionCode = superDistribution.toPython("distribution", " ")
code += f"{distributionCode}"
# Get the basis
indices = self.functionalChaosResult.getIndices()
code += f" # Set the indices\n"
code += f" indices = {indices}\n"
# Serialize the coefficients
coefficients = self.functionalChaosResult.getCoefficients()
coeffCode = SuperSample(coefficients).toPython()
code += " # Define the coefficients\n"
code += f" coefficients = {coeffCode}\n"
metamodelCode = """
# Set the basis
inputDimension = distribution.getDimension()
polynomials = PolynomialFamilyCollection(inputDimension)
for i in range(inputDimension):
marginalPolynomial = StandardDistributionPolynomialFactory(marginals[i])
polynomials[i] = marginalPolynomial
enumerate = LinearEnumerateFunction(inputDimension)
basis = OrthogonalProductPolynomialFactory(polynomials, enumerate)
# Set the function collection
function_collection = []
numberOfIndices = len(indices)
for i in range(numberOfIndices):
function_collection.append(basis.build(indices[i]))
# Set the composed metamodel, set the transformation, create the metamodel
composedMetaModel = DualLinearCombinationFunction(function_collection, coefficients)
measure = basis.getMeasure()
transformation = DistributionTransformation(distribution, measure)
metaModel = ComposedFunction(composedMetaModel, transformation)
# Evaluate the metamodel
Y = metaModel(X)
"""
code += metamodelCode
code += " return Y\n"
return code
# %%
superPCE = SuperChaosResult(chaosResult)
print("Python code:")
pythonCode = superPCE.toPython()
print(pythonCode)
# %%
f = open("pce_metamodel.py", "w")
f.write(pythonCode)
f.close()
# %%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment