Skip to content

Instantly share code, notes, and snippets.

@philschmid
Last active October 29, 2019 10:58
Show Gist options
  • Save philschmid/7fe5017b1581cc6b183ff84c57981fa9 to your computer and use it in GitHub Desktop.
Save philschmid/7fe5017b1581cc6b183ff84c57981fa9 to your computer and use it in GitHub Desktop.
Talog Cloudwatch Alarms
#!/usr/local/bin/python3
import boto3
import time
cw_log = boto3.client('logs')
LOG_GROUP='MODULE-NAME'
LOG_STREAM='LAMBDA/DOCKER-ID'
class Talog:
"""Log Class for Cloudwatch
Methods:
__init__:
sets log_stream_name variable
sets log_group_name variable
calls create_loggroup
calls create_logstream
calls init_cloudwatch_alarm
__create_loggroup:
creates __create_loggroup if not exist
__create_logstream:
creates logstream if not exist
__init_cloudwatch_alarm:
creates logstream if not exist
__get_next_logtoken:
returns the next sequence token
log:
logs message
"""
cw_alarm_treshhold= 10
def __init__(self, log_stream_name, log_group_name):
self.log_stream_name = log_stream_name
self.log_group_name = log_group_name
self.last_log_event = {}
self.__create_loggroup()
self.__create_logstream()
self.__init_cloudwatch_alarm()
def __create_loggroup(self):
try:
cw_log.create_log_group(logGroupName=self.log_group_name)
except client.exceptions.ResourceAlreadyExistsException:
pass
print (f"logstream {self.log_stream_name} erstellt")
def __create_logstream(self):
try:
cw_log.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
except client.exceptions.ResourceAlreadyExistsException:
pass
print (f"logstream {self.log_stream_name} erstellt")
def __init_cloudwatch_alarm(self):
try:
cw_log.put_metric_alarm(
AlarmName=f"Cw_Alarm_{self.log_group_name}_{self.log_stream_name}",
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=cw_alarm_treshhold,
MetricName='IncomingLogEvents',
Namespace='AWS/Logs',
Period=60,
Statistic='Average',
Threshold=70.0,
ActionsEnabled=False,
AlarmDescription='Alarm when server CPU exceeds 70%',
Dimensions=[
{
'Name': 'LogGroupName',
'Value': self.log_group_name
},
],
Unit='Seconds'
)
except client.exceptions.ResourceAlreadyExistsException:
pass
def __get_next_logtoken(self):
cw_group = cw_log.describe_log_streams(logGroupName=self.log_group_name)
if 'uploadSequenceToken' in cw_group['logStreams'][0]:
return cw_group['logStreams'][0] ['uploadSequenceToken']
def log(self, message):
timestamp = int(round(time.time() * 1000))
log_event = {logGroupName=self.log_group_name,
logStreamName=self.log_stream_name,
logEvents=[{
'timestamp': timestamp,
'message': message
}]
if 'nextSequenceToken' in self.last_log_event:
sequence_token = self.last_log_event["nextSequenceToken"]
else :
sequence_token = self.__get_next_logtoken()
if (sequence_token):
log_event['sequenceToken']=sequence_token
# log_event.update({'sequenceToken': sequence_token})
self.last_log_event = cw_log.put_log_events(log_event)
MyNewAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: "AlarmNameGoesHere"
AlarmDescription: "Alarm if lambda errors out too many times"
Namespace: "AWS/Lambda"
MetricName: "Errors"
Dimensions:
- Name: "FunctionName"
Value: "NameOfYourLambdaFunction"
Statistic: "Sum"
ComparisonOperator: "GreaterThanThreshold"
Threshold: 0
EvaluationPeriods: 5
Period: 60
TreatMissingData: "breaching"
put_metric_alarm(
AlarmName=f"Cw_Alarm_{self.log_group_name}_{self.log_stream_name}",
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=cw_alarm_treshhold,
MetricName='KPIs',
Namespace='Some/CoolApp',
Period=60,
Statistic='Average',
Threshold=70.0,
ActionsEnabled=False,
AlarmDescription='Alarm when server CPU exceeds 70%',
Dimensions=[
{
'Name': 'PURCHASES_SERVICE',
'Value': 'CoolService'
},
{
'Name': 'APP_VERSION',
'Value': '1.0'
},
],
Unit='None'
)
## put metric data into metric can be watched by cloudwatch alarm
import boto3
# Create CloudWatch client
cloudwatch = boto3.client('cloudwatch')
# Put custom metrics
cloudwatch.put_metric_data(
MetricData = [
{
'MetricName': 'KPIs',
'Dimensions': [
{
'Name': 'PURCHASES_SERVICE',
'Value': 'CoolService'
},
{
'Name': 'APP_VERSION',
'Value': '1.0'
},
],
'Unit': 'None',
'Value': random.randint(1, 500)
},
],
Namespace='Some/CoolApp'
)

Idea

  • sdk for python and javascript
  • talog class
  • methods:
    • public init -> creates log_group + Logstream
    • pirvate create_logstream -> creates logstream after creation
    • private init_cloudwatch_alarm -> creates alarm after stream is created
    • public log -> logs something
    • private get next Log-Token

Links

How To

  • log_group is initalised only one time -> checks if it exists otherwise creates
  • log_stream is generated per lambda/docker

Talos Class

class Talog:
   'Common base class for all employees'

   def __init__(self, log_stream_name, log_group_name):
      self.log_stream_name = log_stream_name
      self.log_group_name = log_group_name
      self.__create_loggroup()
      self.__create_logstream()      
      self.__init_cloudwatch_alarm()      


   def __create_logstream(self):
     print (f"logstream {self.log_stream_name} erstellt")

   def __create_loggroup(self):
     print (f"logstream {self.log_stream_name} erstellt")
   
     
   def __init_cloudwatch_alarm(self):
     print (f"logstream {self.log_stream_name} erstellt")

   def __get_next_logtoken(self):
     print (f"logstream {self.log_stream_name} erstellt")

   def log(self,message):
     print (f"logstream {self.log_stream_name} erstellt")  
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment