Created
March 17, 2021 08:37
-
-
Save saswata-dutta/6f65ebe36cad1e548711576df081e10d to your computer and use it in GitHub Desktop.
Query cloudwatch and dump all records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import sys | |
import time | |
from datetime import datetime, timezone, timedelta | |
cloudwatch = boto3.client('logs') | |
logGroupName = "Prod/application.json.log" | |
queryString = 'filter message like "EMAIL MATCHING:" | fields message' | |
field = "message" | |
start = datetime(2021, 1, 29, tzinfo=timezone.utc) | |
end = datetime(2021, 3, 17, tzinfo=timezone.utc) | |
# only 10k log events can be fetched at a time, adjust duration accordingly | |
duration = timedelta(hours=1) | |
fout = open("out.logs", 'w') | |
matched = 0 | |
##### | |
def extractFields(results): | |
global matched | |
for result in results: | |
for item in result: | |
if item["field"] == field: | |
print(item["value"], file=fout) | |
matched += 1 | |
PENDING_STATUS = {"Scheduled", "Running"} | |
def fetchResults(queryId): | |
time.sleep(1) | |
response = cloudwatch.get_query_results(queryId=queryId) | |
status = response["status"] | |
if status == "Complete": | |
results = response["results"] | |
print(f"Found {len(results)} for {queryId}") | |
extractFields(results) | |
elif status in PENDING_STATUS: | |
print(f"Waiting for {queryId}") | |
fetchResults(queryId) | |
else: | |
print(queryId, response, file=sys.stderr) | |
def runSlice(startTime, endTime): | |
response = cloudwatch.start_query( | |
logGroupName=logGroupName, | |
queryString=queryString, | |
startTime=startTime, | |
endTime=endTime, | |
limit=10000 | |
) | |
queryId = response["queryId"] | |
print(f"Started {startTime} {endTime} {queryId}") | |
fetchResults(queryId) | |
print(f"Done {startTime} {endTime} {queryId}") | |
def runAll(): | |
start_epoch = int(start.timestamp()) | |
end_epoch = int(end.timestamp()) | |
delta = int(duration.total_seconds()) | |
while start_epoch < end_epoch: | |
print(f"Processing {datetime.utcfromtimestamp(start_epoch)}") | |
next_epoch = start_epoch + delta | |
runSlice(start_epoch, next_epoch) | |
start_epoch = next_epoch | |
##### | |
def main(): | |
runAll() | |
print(f"Matched = {matched}") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment