Last active
July 12, 2019 10:01
-
-
Save butlerx/a007ecdf41e5581aec9e7b14101566d8 to your computer and use it in GitHub Desktop.
async graphite tree walker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import logging | |
from asyncio import AbstractEventLoop, ensure_future, gather, get_event_loop, sleep | |
from collections import deque | |
from typing import Deque, List | |
import structlog | |
from aiohttp import BasicAuth, ClientSession, ContentTypeError | |
structlog.configure( | |
processors=[ | |
structlog.stdlib.add_log_level, | |
structlog.processors.format_exc_info, | |
structlog.stdlib.ProcessorFormatter.wrap_for_formatter, | |
], | |
logger_factory=structlog.stdlib.LoggerFactory(), | |
) | |
formatter = structlog.stdlib.ProcessorFormatter( | |
processor=structlog.dev.ConsoleRenderer() | |
) | |
handler = logging.StreamHandler() | |
handler.setFormatter(formatter) | |
root_logger = logging.getLogger() | |
root_logger.addHandler(handler) | |
root_logger.setLevel(logging.INFO) | |
class Branch: | |
def __init__(self, prefix: str, depth: int): | |
self.prefix = prefix | |
self.depth = depth | |
self.attempts = 0 | |
class Walker: | |
"""Graphite tree walker""" | |
def __init__( | |
self, | |
url: str, | |
file: str, | |
user: str = None, | |
password: str = None, | |
series_from: str = None, | |
loop: AbstractEventLoop = None, | |
max_depth: int = None, | |
): | |
self.queue: Deque[Branch] = deque() | |
self.url = url | |
self.user = user | |
self.password = password | |
self.series_from = series_from | |
self.enabled = False | |
self.max_attempts = 3 | |
self.loop = loop or get_event_loop() | |
self._dispatch_task = None | |
self.logger = structlog.get_logger("walker") | |
self.max_depth = max_depth | |
self.metrics_paths: List[str] = [] | |
self.output_file = file | |
async def run(self): | |
"""process event queue""" | |
self.logger.debug("processing event queue") | |
self.enabled = True | |
while self.enabled: | |
await self._dispatch() | |
await sleep(1, loop=self.loop) | |
if not self.queue: | |
return | |
async def _dispatch(self): | |
tasks = [] | |
while self.queue: | |
tasks.append(self._walk(self.queue.popleft())) | |
if not tasks: | |
return | |
return await gather(*tasks, loop=self.loop) | |
async def _format_payload(self, task: Branch): | |
payload = { | |
"query": f"{task.prefix}.*" if task.prefix != "*" else task.prefix, | |
"format": "treejson", | |
} | |
if self.series_from: | |
payload["from"] = self.series_from | |
return payload | |
async def _walk(self, task: Branch): | |
task.attempts += 1 | |
if task.attempts > self.max_attempts: | |
return | |
async with ClientSession(loop=self.loop) as session: | |
self.logger.debug("Walking Branch", branch=task.prefix, depth=task.depth) | |
result = await session.get( | |
f"{self.url}/metrics/find", | |
params=await self._format_payload(task), | |
auth=BasicAuth(login=self.user, password=self.password) | |
if self.user is not None and self.password is not None | |
else None, | |
) | |
if result.status == 429: | |
self.logger.error( | |
"requesting too many metrics backing off", | |
branch=task.prefix, | |
depth=task.depth, | |
status_code=result.status, | |
) | |
await sleep(60, loop=self.loop) | |
self.queue.appendleft(task) | |
elif result.status >= 400: | |
self.logger.error( | |
"Failed to walk branch", | |
branch=task.prefix, | |
depth=task.depth, | |
status_code=result.status, | |
) | |
await self._store_metric(task.prefix) | |
else: | |
tasks = [] | |
try: | |
metrics = await result.json() | |
except ContentTypeError: | |
self.logger.error( | |
"failed to decode json", branch=task.prefix, depth=task.depth | |
) | |
metrics = [] | |
for metric in metrics: | |
if metric["leaf"]: | |
await self._store_metric(metric["id"]) | |
else: | |
next_depth = task.depth + 1 | |
if self.max_depth and next_depth >= self.max_depth: | |
await self._store_metric(metric["id"]) | |
else: | |
tasks.append(Branch(metric["id"], next_depth)) | |
self.queue.extend(tasks) | |
async def _store_metric(self, metric: str): | |
self.metrics_paths.append(metric) | |
self.logger.debug("found metric", metric=metric) | |
async def enqueue(self, branch: Branch): | |
self.logger.info("adding branch", prefix=branch.prefix, depth=branch.depth) | |
self.queue.append(branch) | |
def show_tree(self): | |
self.metrics_paths.sort() | |
with open(self.output_file, "w") as file: | |
for metric in self.metrics_paths: | |
file.write(f"{metric}\n") | |
async def main(): | |
"""main cli interface""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--url", "-u", help="Graphite URL", required=True) | |
parser.add_argument( | |
"--prefix", "-p", help="Metrics prefix", required=False, default="*" | |
) | |
parser.add_argument("--user", help="Basic Auth username", required=False) | |
parser.add_argument("--password", help="Basic Auth password", required=False) | |
parser.add_argument( | |
"--from", | |
dest="seriesFrom", | |
help="only get series that have been active since this time", | |
required=False, | |
) | |
parser.add_argument( | |
"--depth", | |
"-d", | |
type=int, | |
help="maximum depth to traverse. If set, the branches at the depth will be printed", | |
required=False, | |
) | |
parser.add_argument( | |
"--file", | |
"-f", | |
type=str, | |
help="file to write metric tree too", | |
required=False, | |
default="metrics_tree.txt", | |
) | |
args = parser.parse_args() | |
walker = Walker( | |
args.url, | |
user=args.user, | |
password=args.password, | |
series_from=args.seriesFrom, | |
file=args.file, | |
max_depth=args.depth, | |
) | |
await walker.enqueue(Branch(args.prefix, 0)) | |
await walker.run() | |
walker.show_tree() | |
if __name__ == "__main__": | |
try: | |
# For production, use libuv as our eventloop as it is *FAR* | |
# more performant than the default asyncio event loop. | |
# https://magic.io/blog/uvloop-blazing-fast-python-networking/ | |
from uvloop import EventLoopPolicy | |
from asyncio import set_event_loop_policy | |
set_event_loop_policy(EventLoopPolicy()) | |
root_logger.info("Using uvloop for asyncio") | |
except Exception: | |
pass | |
loop: AbstractEventLoop = get_event_loop() | |
try: | |
loop.run_until_complete(ensure_future(main())) | |
except KeyboardInterrupt: | |
root_logger.info("Shutting down application") | |
loop.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment