Skip to content

Instantly share code, notes, and snippets.

@kisst
Last active November 7, 2024 09:58
Show Gist options
  • Save kisst/afd0697ce271094f0aee3b07c1d5c008 to your computer and use it in GitHub Desktop.
Save kisst/afd0697ce271094f0aee3b07c1d5c008 to your computer and use it in GitHub Desktop.
Quick script for quick debugging
import boto3
import os
import sys
# Initialize boto3 clients
ec2_client = boto3.client('ec2')
# Required VPC endpoint services
ENDPOINT_SERVICES = {
"com.amazonaws.{region}.ssm": "Interface",
"com.amazonaws.{region}.ssmmessages": "Interface",
"com.amazonaws.{region}.ec2messages": "Interface",
"com.amazonaws.{region}.logs": "Interface",
"com.amazonaws.{region}.s3": "Gateway", # S3 requires Gateway endpoint
"com.amazonaws.{region}.kms": "Interface"
}
SECURITY_GROUP_NAME = "SSM_VPC_Endpoint_SG"
def list_vpcs():
"""Retrieve and list all VPCs in the account."""
response = ec2_client.describe_vpcs()
vpcs = response.get('Vpcs', [])
for idx, vpc in enumerate(vpcs, 1):
print(f"{idx}. VPC ID: {vpc['VpcId']} - CIDR: {vpc['CidrBlock']}")
return vpcs
def choose_vpc(vpcs):
"""Prompt user to select a VPC, with the first one as the default."""
default_choice = 1
choice = input(f"Select a VPC by number (default is {default_choice}): ").strip() or str(default_choice)
choice = int(choice) - 1
if 0 <= choice < len(vpcs):
return vpcs[choice]['VpcId']
else:
print("Invalid selection.")
sys.exit(1)
def get_security_group(vpc_id):
"""Check if the security group already exists, create it if not."""
response = ec2_client.describe_security_groups(
Filters=[
{'Name': 'group-name', 'Values': [SECURITY_GROUP_NAME]},
{'Name': 'vpc-id', 'Values': [vpc_id]}
]
)
if response['SecurityGroups']:
return response['SecurityGroups'][0]['GroupId']
else:
sg = ec2_client.create_security_group(
GroupName=SECURITY_GROUP_NAME,
Description="Security group for VPC Endpoints to allow HTTPS access",
VpcId=vpc_id
)
sg_id = sg['GroupId']
ec2_client.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}]
)
print(f"Created security group {SECURITY_GROUP_NAME} with HTTPS ingress rule.")
return sg_id
def delete_security_group(vpc_id):
"""Delete the security group if it exists."""
response = ec2_client.describe_security_groups(
Filters=[
{'Name': 'group-name', 'Values': [SECURITY_GROUP_NAME]},
{'Name': 'vpc-id', 'Values': [vpc_id]}
]
)
if response['SecurityGroups']:
sg_id = response['SecurityGroups'][0]['GroupId']
ec2_client.delete_security_group(GroupId=sg_id)
print(f"Deleted security group {SECURITY_GROUP_NAME}")
def endpoint_exists(vpc_id, service_name):
"""Check if a VPC endpoint for the specified service already exists."""
endpoints = ec2_client.describe_vpc_endpoints(
Filters=[
{'Name': 'vpc-id', 'Values': [vpc_id]},
{'Name': 'service-name', 'Values': [service_name]}
]
).get('VpcEndpoints', [])
return len(endpoints) > 0
def toggle_endpoints(vpc_id, region, action):
"""Enable or disable VPC endpoints."""
security_group_id = None
if action == 'enable':
security_group_id = get_security_group(vpc_id)
for service, endpoint_type in ENDPOINT_SERVICES.items():
endpoint_service_name = service.format(region=region)
try:
if action == 'enable':
if endpoint_exists(vpc_id, endpoint_service_name):
print(f"VPC endpoint for {endpoint_service_name} already exists. Skipping creation.")
continue
if endpoint_type == "Gateway":
ec2_client.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=endpoint_service_name,
VpcEndpointType='Gateway',
RouteTableIds=[route_table['RouteTableId'] for route_table in ec2_client.describe_route_tables(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])['RouteTables']]
)
else:
ec2_client.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=endpoint_service_name,
VpcEndpointType='Interface',
PrivateDnsEnabled=True,
SecurityGroupIds=[security_group_id]
)
print(f"Enabled VPC endpoint for {endpoint_service_name}")
elif action == 'disable':
endpoints = ec2_client.describe_vpc_endpoints(Filters=[
{'Name': 'vpc-id', 'Values': [vpc_id]},
{'Name': 'service-name', 'Values': [endpoint_service_name]}
])['VpcEndpoints']
for endpoint in endpoints:
ec2_client.delete_vpc_endpoints(VpcEndpointIds=[endpoint['VpcEndpointId']])
print(f"Disabled VPC endpoint for {endpoint_service_name}")
else:
print("Invalid action specified.")
sys.exit(1)
except Exception as e:
print(f"Error {action} endpoint for {endpoint_service_name}: {e}")
if action == 'disable' and security_group_id:
delete_security_group(vpc_id)
def main():
print("Welcome to the VPC Endpoint Configuration Tool!")
# Default to environment variable for AWS region
default_region = os.getenv('AWS_DEFAULT_REGION', 'us-west-2')
region = input(f"Enter the AWS region (default is {default_region}): ").strip() or default_region
# List and pick a VPC, defaulting to the first one
print("\nListing available VPCs...")
vpcs = list_vpcs()
vpc_id = choose_vpc(vpcs)
print(f"\nSelected VPC: {vpc_id}")
# Choose action, defaulting to 'enable'
action = input("Do you want to enable or disable endpoints? (default is 'enable'): ").strip().lower() or 'enable'
if action not in ['enable', 'disable']:
print("Invalid action. Please enter 'enable' or 'disable'.")
sys.exit(1)
# Enable or disable endpoints
toggle_endpoints(vpc_id, region, action)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment