Skip to content

Instantly share code, notes, and snippets.

@ThorstenHans
Created December 4, 2024 16:09
Show Gist options
  • Save ThorstenHans/2887e6d1c676da196c424f51d1684659 to your computer and use it in GitHub Desktop.
Save ThorstenHans/2887e6d1c676da196c424f51d1684659 to your computer and use it in GitHub Desktop.
Simple Slack Notifier for new OCI artifacts in an ACR

New OCI artifact Notifier

Prerequisites

  • An ACR instance
  • An Event Grid instance with System Topic listening to push events of container images and helm charts
  • Spin (I tested with 3.0.0)
  • componentize-py (I tested with 0.16.0)
  • A slack app with incoming webhook activated (you need the webhook url)

Build and Deploy

spin b

spin cloud deploy --variable slack_webhook_url=<YOUR_SLACK_WEBHOOK_URL>
from spin_sdk import variables
from spin_sdk.http import IncomingHandler, Request, Response, send
from cloudevents.http import from_json
import json
class IncomingHandler(IncomingHandler):
def handle_request(self, request: Request) -> Response:
print(request.method)
if request.method == "OPTIONS":
origin = request.headers.get("WebHook-Request-Origin")
rate = request.headers.get("WebHook-Request-Rate")
if origin is None:
origin= "*"
if rate is None:
rate = 120
print(rate)
return Response(
200,
{"allow": "post",
"webhook-allowed-origin": origin,
"webhook-request-rate": str(rate)},
None)
slack_url = variables.get("slack_webhook_url")
slack_channel = variables.get("slack_channel")
slack_username = variables.get("slack_username")
json_payload = request.body.decode('utf-8')
event = from_json(json_payload)
message = f"New OCI artifact `{event['subject']}` pushed to `{event.data['request']['host']}` Digest is `{event.data['target']['digest']}`"
print(message)
slack_notification_payload = {
"channel": slack_channel,
"username": slack_username,
"mrkdwn": True,
"text": message
}
slack_notification = Request("POST", slack_url, {
"content-type": "application/json"
}, bytes(json.dumps(slack_notification_payload), "utf-8"))
send(slack_notification)
return Response(
200,
{"content-type": "text/plain"},
bytes("", "utf-8")
)
cloudevents==1.11.0
componentize-py==0.16.0
deprecation==2.1.0
packaging==24.2
spin-sdk==3.2.1
spin_manifest_version = 2
[application]
authors = ["Thorsten Hans <[email protected]>"]
description = ""
name = "new-oci-notifier"
version = "0.1.0"
[variables]
slack_webhook_url = { required = true }
slack_channel = { default ="common" }
slack_username = { default = "Bob the Bot" }
[[trigger.http]]
route = "/..."
component = "new-oci-notifier"
[component.new-oci-notifier]
source = "app.wasm"
allowed_outbound_hosts = ["https://hooks.slack.com"]
[component.new-oci-notifier.variables]
slack_webhook_url = "{{ slack_webhook_url }}"
slack_channel = "{{ slack_channel }}"
slack_username = "{{ slack_username }}"
[component.new-oci-notifier.build]
command = "componentize-py -w spin-http componentize app -o app.wasm"
watch = ["*.py", "requirements.txt"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment