Skip to content

Instantly share code, notes, and snippets.

@brandond
Last active December 16, 2020 16:10
Show Gist options
  • Save brandond/20013b62498357a7d553230570216341 to your computer and use it in GitHub Desktop.
Save brandond/20013b62498357a7d553230570216341 to your computer and use it in GitHub Desktop.
Sample Menlo Security logging API client
#!/usr/bin/env python
import logging
import requests
import click
import itertools
from dateutil.parser import isoparse
from datetime import datetime, timezone
from time import time
try:
import ujson as json
except ImportError:
import json
logger = logging.getLogger(__name__)
SESSION = requests.Session()
URL = 'https://logs.menlosecurity.com/api/rep/v1/fetch/client_select'
@click.command(context_settings={'max_content_width': 120, 'ignore_unknown_options': True})
@click.option('--api-key', type=str, required=True)
@click.option('--start', type=int)
@click.option('--end', type=int, default=time())
@click.option('--step', type=int, default=60)
@click.option('--step-limit', type=int, default=0)
@click.option('--format', type=click.Choice(['JSON', 'KVP', 'CEF', 'LEEF']), default='JSON')
@click.option('--type', type=click.Choice(['web', 'audit', 'email', 'attachment', 'smtp']), default='web')
@click.pass_context
def cli(ctx, api_key, start, end, step, step_limit, format, type):
if not start:
start = get_start_timestamp(api_key=api_key, type=type)
logger.info(f"Getting {type} events in {step} second windows from {datetime.utcfromtimestamp(start)} to {datetime.utcfromtimestamp(end)}")
step_count = 0
for chunk_start in itertools.count(start=start, step=step):
chunk_end = chunk_start + step
if chunk_end > end:
chunk_end = end
if chunk_end <= chunk_start:
break
if step_limit > 0 and step_count >= step_limit:
break
logger.info(f"Getting {type} events from {datetime.utcfromtimestamp(chunk_start)} to {datetime.utcfromtimestamp(chunk_end)}")
step_count += 1
page = {}
while True:
try:
events = get_logs(api_key=api_key, start=chunk_start, end=chunk_end, format=format, type=type, page=page)
except Exception:
events = []
if events:
logger.info(f"Got {len(events)} events in page")
for k, v in page.items():
logger.info(f"\t{k}: {v}")
for event in events:
if format == 'JSON':
print(json.dumps(event['event']))
else:
print(event['event'])
else:
break
def get_start_timestamp(api_key, type):
"""Find the current start of available logs by asking for the single oldest event"""
request_ts = datetime.now(timezone.utc)
response = SESSION.post(url=URL,
json={'token': api_key, 'log_type': type},
params={'start': 0, 'limit': 1})
response.raise_for_status()
r_json = response.json()[0]
timestamp = isoparse(r_json['timestamp'])
logger.info(f"Oldest {type} event timestamp is {timestamp} - {request_ts - timestamp}")
start_ts = int(timestamp.timestamp() / 60) * 60 # round to nearest minute
# Pagination breaks if you try to iterate over the oldest set of logs, since they're actively being
# purged from the backend. It will keep sending you the same logs over and over again, with different
# pagination indexes. Fix this by skipping the first 120 seconds of logs.
return start_ts + 120
def get_logs(api_key, start, end, limit=1000, format='JSON', type='web', page={}):
"""Get logs for a given window. The `page` dict will be updated in-place with pagingIdentifiers for the next iteration."""
response = SESSION.post(url=URL,
json={'token': api_key, 'log_type': type, 'pagingIdentifiers': page},
params={'start': start, 'end': end, 'limit': limit, 'format': format})
response.raise_for_status()
r_json = response.json()[0]
page.clear()
page.update(r_json['result']['pagingIdentifiers'])
return r_json['result']['events']
if __name__ == '__main__':
logging.basicConfig(level='INFO', format='%(message)s')
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment