Skip to content

Instantly share code, notes, and snippets.

@rawc0der
Last active December 14, 2020 13:06
Show Gist options
  • Save rawc0der/e6ef3ad44280ff44cc7c8062a4816dd7 to your computer and use it in GitHub Desktop.
Save rawc0der/e6ef3ad44280ff44cc7c8062a4816dd7 to your computer and use it in GitHub Desktop.
Rego Annotations -- Package for extracting Parameters and creating Exceptions from resource annotations (using Konstraint as core library)
package lib.annotations
# Annotations helper library used to extract policy metadata information from annotation fields
# Usage:
#
# import data.lib.annotations
# annotations = annotations.policy_annotations()
#
import data.lib.core
import data.lib.pods
default paramsAnnotationField = "konstraint.policies/parameters"
default ignoreAnnotationField = "konstraint.policies/ignore"
get_default(object, field, _default) = output {
core.has_field(object, field)
object[field] != null
output = object[field]
}
get_default(object, field, _default) = output {
core.has_field(object, field) == false
output = _default
}
policy_annotations = return {
return := object.union(
get_default(core.resource.metadata, "annotations", {}),
get_default(pods.pod.metadata, "annotations", {}))
} else = return {
return := get_default(pods.pod.metadata, "annotations", {})
return != null
} else = return {
return := get_default(core.resource.metadata, "annotations", {})
return != null
} else = return {
return := {}
}
kind: Deployment
metadata:
name: MyDeployment
spec:
....
template:
metadata:
annotations:
konstraint.policies/parameters.P0001: '{"secret": "my-secret-value"}'
konstraint.policies/ignore: "P0001"
### Passing parameters and exception switch in the resource annotations
package lib.exceptions
# Using resource wrapper logic to obtain exceptions metadata from annotations
# Usage:
#
# import data.lib.exceptions
# policyID = ...
#
# violation[{"msg": msg}] {
# not exceptions.is_exception(policyID)
# ...
# }
#
import data.lib.core
import data.lib.parameters
import data.lib.annotations
is_exception(policyID) = true {
annotations_object := annotations.policy_annotations()
core.has_field(annotations_object, annotations.ignoreAnnotationField)
annotations_object[annotations.ignoreAnnotationField] != null
ignoreList := split(annotations_object[annotations.ignoreAnnotationField],",")
ignoreList[_] == policyID
}
package lib.parameters
# Using wrapper logic to obtain extra parameters from annotations
# Match by policyID and merge default parameters passed as data object with extra parameters from annotations
#
# import data.lib.parameters
# policyID = ...
#
# parameters = parameters.parameters(policyID)
#
import data.lib.core
import data.lib.pods
import data.lib.annotations
parameters(policyID) = params {
params := object.union(core.parameters[policyID], extra_parameters(policyID))
}
extra_parameters(policyID) = params {
meta_annotations := annotations.policy_annotations()
policyAnnotationField := sprintf("%s.%s", [annotations.paramsAnnotationField, policyID])
core.has_field(meta_annotations, policyAnnotationField)
params := json.unmarshal(meta_annotations[policyAnnotationField])
} else = params {
params := {}
}
import data.lib.exceptions
import data.lib.parameters
policyID = "P0001"
violation[{"msg": msg}] {
not exceptions.is_exception(policyID)
secret_value = parameters.parameters(policyID).secret
secret_value != "not-so-secret-value"
}
### Reading parameters from annotations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment