Last active
December 16, 2021 10:16
-
-
Save mnanchev/8b539c46be3b64826640a173dacf1b98 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import boto3 | |
class QueueMessageSender: | |
def __init__(self) -> None: | |
self.s3_client = boto3.client("s3") | |
self.sqs_client = boto3.client("sqs") | |
def send_message_to_queue(self, queue_url, message): | |
try: | |
response = self.sqs_client.send_message( | |
QueueUrl=queue_url, MessageBody=json.dumps(message) | |
) | |
return response | |
except self.sqs_client.exceptions.InvalidMessageContents: | |
print("Invalid Message Contents") | |
return "Error the content of the message is invalid" | |
except self.sqs_client.exceptions.UnsupportedOperation: | |
print("Unsupported Operation") | |
return "unsupported operation" | |
def build_message(self, event): | |
response = self.s3_client.get_object_tagging( | |
Bucket=event["Records"][0]["s3"]["bucket"]["name"], | |
Key=event["Records"][0]["s3"]["object"]["key"], | |
) | |
return { | |
"result": response["TagSet"][0]["Value"], | |
"bucket": event["Records"][0]["s3"]["bucket"]["name"], | |
"key": event["Records"][0]["s3"]["object"]["key"], | |
} | |
@staticmethod | |
def check_if_infected(result): | |
if result == "infected": | |
return True | |
return False | |
class ObjectCleaner: | |
def __init__(self) -> None: | |
self.s3_client = boto3.client("s3") | |
def delete_object(self, bucket, key): | |
try: | |
self.s3_client.delete_object(Bucket=bucket, Key=key) | |
return "Object Deleted" | |
except self.s3_client.exceptions.NoSuchKey: | |
print("No Such Key") | |
return "No Such Key" | |
except self.s3_client.exceptions.ServerError: | |
print("Server Error") | |
return "Server Error" | |
except self.s3_client.exceptions.ClientError: | |
print("Client Error") | |
return "Client Error" | |
SCANNER_PROFILE_RESULT_QUEUE_URL = os.environ["SCANNER_PROFILE_RESULT_QUEUE_URL"] | |
SCANNER_VIDEO_RESULT_QUEUE_URL = os.environ["SCANNER_PROFILE_RESULT_QUEUE_URL"] | |
QUEUE_MESSAGE = QueueMessageSender() | |
OBJECT_CLEANER = ObjectCleaner() | |
def scanning_event_handler(event, __): | |
message_json = QUEUE_MESSAGE.build_message(event) | |
response = {"statusCode": 200, "body": json.dumps("Object is not infected")} | |
if ( | |
"cdn" in message_json["bucket"] | |
and QueueMessageSender.check_if_infected(message_json["result"]) is True | |
): | |
QUEUE_MESSAGE.send_message_to_queue( | |
SCANNER_VIDEO_RESULT_QUEUE_URL, message_json | |
) | |
OBJECT_CLEANER.delete_object(message_json["bucket"], message_json["key"]) | |
response = { | |
"statusCode": 200, | |
"body": json.dumps( | |
"Video message sent to queue. Infected Object was deleted" | |
), | |
} | |
elif QueueMessageSender.check_if_infected(message_json["result"]) is True: | |
QUEUE_MESSAGE.send_message_to_queue( | |
SCANNER_PROFILE_RESULT_QUEUE_URL, message_json | |
) | |
OBJECT_CLEANER.delete_object(message_json["bucket"], message_json["key"]) | |
response = { | |
"statusCode": 200, | |
"body": json.dumps( | |
"Profile message sent to queue. Infected Object was deleted" | |
), | |
} | |
print(response) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment