Last active
December 16, 2020 16:10
-
-
Save brandond/20013b62498357a7d553230570216341 to your computer and use it in GitHub Desktop.
Sample Menlo Security logging API client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import requests | |
import click | |
import itertools | |
from dateutil.parser import isoparse | |
from datetime import datetime, timezone | |
from time import time | |
try: | |
import ujson as json | |
except ImportError: | |
import json | |
logger = logging.getLogger(__name__) | |
SESSION = requests.Session() | |
URL = 'https://logs.menlosecurity.com/api/rep/v1/fetch/client_select' | |
@click.command(context_settings={'max_content_width': 120, 'ignore_unknown_options': True}) | |
@click.option('--api-key', type=str, required=True) | |
@click.option('--start', type=int) | |
@click.option('--end', type=int, default=time()) | |
@click.option('--step', type=int, default=60) | |
@click.option('--step-limit', type=int, default=0) | |
@click.option('--format', type=click.Choice(['JSON', 'KVP', 'CEF', 'LEEF']), default='JSON') | |
@click.option('--type', type=click.Choice(['web', 'audit', 'email', 'attachment', 'smtp']), default='web') | |
@click.pass_context | |
def cli(ctx, api_key, start, end, step, step_limit, format, type): | |
if not start: | |
start = get_start_timestamp(api_key=api_key, type=type) | |
logger.info(f"Getting {type} events in {step} second windows from {datetime.utcfromtimestamp(start)} to {datetime.utcfromtimestamp(end)}") | |
step_count = 0 | |
for chunk_start in itertools.count(start=start, step=step): | |
chunk_end = chunk_start + step | |
if chunk_end > end: | |
chunk_end = end | |
if chunk_end <= chunk_start: | |
break | |
if step_limit > 0 and step_count >= step_limit: | |
break | |
logger.info(f"Getting {type} events from {datetime.utcfromtimestamp(chunk_start)} to {datetime.utcfromtimestamp(chunk_end)}") | |
step_count += 1 | |
page = {} | |
while True: | |
try: | |
events = get_logs(api_key=api_key, start=chunk_start, end=chunk_end, format=format, type=type, page=page) | |
except Exception: | |
events = [] | |
if events: | |
logger.info(f"Got {len(events)} events in page") | |
for k, v in page.items(): | |
logger.info(f"\t{k}: {v}") | |
for event in events: | |
if format == 'JSON': | |
print(json.dumps(event['event'])) | |
else: | |
print(event['event']) | |
else: | |
break | |
def get_start_timestamp(api_key, type): | |
"""Find the current start of available logs by asking for the single oldest event""" | |
request_ts = datetime.now(timezone.utc) | |
response = SESSION.post(url=URL, | |
json={'token': api_key, 'log_type': type}, | |
params={'start': 0, 'limit': 1}) | |
response.raise_for_status() | |
r_json = response.json()[0] | |
timestamp = isoparse(r_json['timestamp']) | |
logger.info(f"Oldest {type} event timestamp is {timestamp} - {request_ts - timestamp}") | |
start_ts = int(timestamp.timestamp() / 60) * 60 # round to nearest minute | |
# Pagination breaks if you try to iterate over the oldest set of logs, since they're actively being | |
# purged from the backend. It will keep sending you the same logs over and over again, with different | |
# pagination indexes. Fix this by skipping the first 120 seconds of logs. | |
return start_ts + 120 | |
def get_logs(api_key, start, end, limit=1000, format='JSON', type='web', page={}): | |
"""Get logs for a given window. The `page` dict will be updated in-place with pagingIdentifiers for the next iteration.""" | |
response = SESSION.post(url=URL, | |
json={'token': api_key, 'log_type': type, 'pagingIdentifiers': page}, | |
params={'start': start, 'end': end, 'limit': limit, 'format': format}) | |
response.raise_for_status() | |
r_json = response.json()[0] | |
page.clear() | |
page.update(r_json['result']['pagingIdentifiers']) | |
return r_json['result']['events'] | |
if __name__ == '__main__': | |
logging.basicConfig(level='INFO', format='%(message)s') | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment