Created
November 18, 2022 23:01
-
-
Save purple4reina/525d228eda6ede52d9c75dbeff3069af to your computer and use it in GitHub Desktop.
Datadog agent in customer code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the agent interfacer code which can be used to start/stop/flush a Datadog agent | |
import pkg_resources | |
import requests | |
import subprocess | |
import time | |
from ddtrace import tracer as ddtracer | |
class _singleton(type): | |
_instances = {} | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
cls._instances[cls] = super(_singleton, cls).__call__(*args, **kwargs) | |
return cls._instances[cls] | |
class Agent(metaclass=_singleton): | |
binary_path = pkg_resources.resource_filename( | |
'datadog_agent', 'includes/datadog-agent') | |
agent_addr = 'http://localhost:8127' | |
agent_ready_path = f'{agent_addr}/ready' | |
flush_traces_path = f'{agent_addr}/trace/flush' | |
flush_metrics_path = f'{agent_addr}/metrics/flush' | |
ready = False | |
tracer = ddtracer | |
def init(self): | |
subprocess.Popen([self.binary_path, 'grep', 'hello']) | |
def flush(self): | |
flushed_traces = self.flush_traces() | |
flushed_metrics = self.flush_metrics() | |
return flushed_traces and flushed_metrics | |
def flush_traces(self): | |
return self._flush_agent(self.flush_traces_path) | |
def flush_metrics(self): | |
return self._flush_agent(self.flush_metrics_path) | |
def _flush_agent(self, path): | |
flush_attempts = 3 | |
while flush_attempts: | |
try: | |
resp = requests.post(path) | |
if resp.ok: | |
return True | |
except Exception: | |
pass | |
flush_attempts -= 1 | |
if flush_attempts: | |
time.sleep(1) | |
return False | |
def is_ready(self): | |
if self.ready: | |
return True | |
try: | |
resp = requests.get(self.agent_ready_path) | |
if not resp.ok: | |
return False | |
except Exception: | |
return False | |
self.ready = resp.json()["ready"] | |
return self.ready | |
agent = Agent() | |
def init(): | |
agent.init() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the customer application | |
#!/usr/bin/env python | |
# This script is run on cron once per day | |
# | |
# If there is a weather advisory for the given US state, then post the advisory | |
# in slack and send an email to the administrators. | |
####################### | |
# 1. INITIALIZE AGENT # | |
####################### | |
import ddtrace | |
ddtrace.patch_all() | |
import datadog_agent | |
datadog_agent.init() | |
import requests | |
import emails | |
import slack | |
STATE = 'OR' | |
ADMIN_EMAILS = '[email protected]' | |
SLACK_CHANNEL = '#weather-advisories' | |
###################### | |
# 2. WRAP ENTRYPOINT # | |
###################### | |
@datadog_agent.wrap | |
def main(event): | |
state = 'OR' | |
advisories = get_weather_advisories() | |
if advisories: | |
advisories_txt = format_advisories(advisories) | |
email_admin(advisories_txt) | |
post_to_slack(advisories_txt) | |
_weather_advisories_url = f'https://api.weather.gov/alerts/active?area={STATE}' | |
def get_weather_advisories(): | |
resp = requests.get(_weather_advisories_url) | |
resp.raise_for_status() | |
return resp.json()['features'] | |
def format_advisories(advisories): | |
adv_txts = '\n'.join( | |
f' ADVISORY: {a["properties"]["event"]} in {a["properties"]["areaDesc"]}' | |
for a in advisories | |
) | |
return f'Weather advisories found for {STATE}:\n{adv_txts}' | |
_email_from = '[email protected]' | |
_email_subject = 'Weather advisories' | |
def email_admin(advisories): | |
emails.send_email(ADMIN_EMAILS, _email_from, _email_subject, advisories) | |
def post_to_slack(advisories): | |
slack.post_to_channel(advisories, SLACK_CHANNEL) | |
if __name__ == '__main__': | |
main({}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// These are the endpoints that are added to the serverless-init binary, it lives at cmd/serverless-init/endpoints.go | |
package main | |
import ( | |
"log" | |
"net/http" | |
"sync" | |
"time" | |
"github.com/DataDog/datadog-agent/pkg/serverless/metrics" | |
"github.com/DataDog/datadog-agent/pkg/serverless/trace" | |
) | |
const serverAddr = ":8127" | |
var ready = struct { | |
sync.Mutex | |
areYou bool | |
done chan struct{} | |
}{done: make(chan struct{}, 1)} | |
func waitTillReady(timeout time.Duration) bool { | |
ready.Lock() | |
defer ready.Unlock() | |
if ready.areYou { | |
return true | |
} | |
select { | |
case <-ready.done: | |
ready.areYou = true | |
case <-time.After(timeout): | |
} | |
return ready.areYou | |
} | |
func setupEndpoints(traceAgent *trace.ServerlessTraceAgent, metricAgent *metrics.ServerlessMetricAgent) { | |
mux := http.NewServeMux() | |
mux.Handle("/trace/flush", newFlushHandler(traceAgent)) | |
mux.Handle("/metrics/flush", newFlushHandler(metricAgent)) | |
mux.HandleFunc("/ready", readyHandler) | |
go log.Fatal(http.ListenAndServe(serverAddr, mux)) | |
ready.done <- struct{}{} | |
} | |
type flusher interface { | |
Flush() | |
} | |
func newFlushHandler(agent flusher) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodPost { | |
w.WriteHeader(http.StatusNotFound) | |
return | |
} | |
agent.Flush() | |
w.WriteHeader(http.StatusOK) | |
}) | |
} | |
func readyHandler(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodGet { | |
w.WriteHeader(http.StatusNotFound) | |
return | |
} | |
switch waitTillReady(5 * time.Second) { | |
case true: | |
w.Write([]byte(`{"ready":"true"}`)) | |
case false: | |
w.Write([]byte(`{"ready":"false"}`)) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the code the customer can use to start/flush the agent with each function call | |
import time | |
from .agent import agent | |
class wrap(object): | |
def __init__(self, fn=None): | |
self.fn = fn | |
def __enter__(self): | |
while True: | |
if agent.is_ready(): | |
break | |
time.sleep(0.1) | |
self.span = agent.tracer.trace('hello world') | |
return agent | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.span.finish() | |
while not agent.flush_traces(): | |
pass | |
def __call__(self, *args, **kwargs): | |
with self: | |
return self.fn(*args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment