Last active
September 26, 2022 04:58
-
-
Save kanghyojun/c691aa71d7b0e2b854311b61e22dfa75 to your computer and use it in GitHub Desktop.
쿼리 딜리버리 SSH 터널링 진단 도구
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""쿼리 딜리버리 접속용 SSH 터널링 진단 도구 | |
- AWS API 키가 필요합니다. | |
- Python3.6 이상의 버전에서 실행하는 것을 추천드립니다. | |
- boto3가 필요합니다. [virtualenv](https://docs.python.org/ko/3/library/venv.html) 환경에서 작업하시는 것을 추천합니다. | |
``` | |
$ python -m venv .venv | |
$ source .venv/bin/activate | |
$ pip install boto3 | |
$ curl -o ssh_tunneling_test.py https://gist.githubusercontent.com/kanghyojun/c691aa71d7b0e2b854311b61e22dfa75/raw/d46888a956010e86b05a13a9befa945f0923cc90/ssh_tunneling_test.py | |
$ python ssh_tunneling_test.py | |
""" | |
import grp | |
import ipaddress | |
import os.path | |
import pathlib | |
import sys | |
from boto3 import client as boto3_client | |
from boto3 import resource as boto3_resource | |
QD_NAT_IP = ipaddress.ip_address("3.34.56.103") | |
pubkey = ( | |
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC9JXkvuPc3ZCWIROpM3ph1R+E5O5ppkewlaoO1rXdSvUjo/V7jP1JxPFobMnYMPXH4pTLLjv1eC" | |
"XJBQMOzPv1e2gJcrDdHd9AJ8BgdaJICHzr9NSVcIGxe57qYXSp+rmIGNRDa73ftkvrPmf8ogylLldELVoLgF/bfihBKq42X6UgIs2mA6TJnWMRv6Yq" | |
"2ng3Umt105DNntROTS/BmJm5LLfOV1r5k99spKr0OrJl+hwVfQb9kbxcwVOAoF7HcGGOr/Utp8H2IGofCKDrqVCCOF7HOITw9rG03VQnX0rgOJNLZg" | |
"OShmSjCokp89P77oaEudqBBZiHtK3WC1irFtWpK8MBTk2XNe6tVP0KC62SA4WX/Ta52u6yKdHd5LH81wDxWWSOBmrHJA7PoKHTVlkwHvRK3MplfL3T" | |
"iASYgLzhQPFfizS9TXshTzK1zBlpWCoOJ1zjzX+knyU24l7a3f2jASb8n9LyW3fd7RYhoyj7fay9ZCddTRO3VTW9v9rMlvU0=" | |
) | |
ec2 = boto3_client("ec2") | |
ec2_resource = boto3_resource("ec2") | |
def check_authorized_keys(): | |
authorized_keys = ( | |
pathlib.Path(os.path.expanduser("~querydelivery")) / ".ssh" / "authorized_keys" | |
) | |
with authorized_keys.open("r") as f: | |
assert pubkey in f.read(), "authorized_keys error" | |
def check_user(): | |
assert get_users(), "querydelivery 유저가 생성되지 않았습니다. " | |
def get_users(): | |
r = set() | |
groups = grp.getgrall() | |
for group in groups: | |
for user in group[3]: | |
r.add(user) | |
return "querydelivery" in user | |
def get_instance_name(inst): | |
name = None | |
for tag in inst["Tags"]: | |
if tag["Key"] == "Name": | |
name = tag["Value"] | |
break | |
return name if name is not None else "이름 없음" | |
def get_instances(): | |
instance_results = ec2.describe_instances() | |
for reservations in instance_results["Reservations"]: | |
for inst in reservations["Instances"]: | |
yield inst | |
def instance_to_string(instance): | |
return "{} ({})".format(instance["InstanceId"], get_instance_name(instance)) | |
def get_security_groups_ssh_cidr_ip(instance): | |
sgs = ec2.describe_security_groups( | |
GroupIds=[sg["GroupId"] for sg in instance["SecurityGroups"]] | |
) | |
for group in sgs["SecurityGroups"]: | |
for ip_permission in group["IpPermissions"]: | |
if not ip_permission["IpRanges"]: | |
continue | |
if not ip_permission.get("FromPort"): | |
continue | |
if ip_permission["FromPort"] == 22: | |
for range in ip_permission["IpRanges"]: | |
try: | |
yield group, ipaddress.ip_network(range["CidrIp"]) | |
except Exception: | |
continue | |
def check_ssh_security_group(instance): | |
for group, ip_network in get_security_groups_ssh_cidr_ip(instance): | |
if QD_NAT_IP in ip_network: | |
yield group, ip_network | |
def check_vpc_network_acl(vpc_id): | |
vpc = ec2_resource.Vpc(vpc_id) | |
for acl in vpc.network_acls.all(): | |
for entry in acl.entries: | |
if ( | |
QD_NAT_IP in ipaddress.ip_network(entry["CidrBlock"]) | |
and not entry["Egress"] | |
): | |
yield acl.id, entry | |
def check_vpc_igw(vpc_id): | |
vpc = ec2_resource.Vpc(vpc_id) | |
return vpc.internet_gateways | |
def main(): | |
print("\n\n# 검사하고 싶은 Instance의 번호를 입력해주세요.\n") | |
instances = list(get_instances()) | |
for i, instance in enumerate(instances): | |
print("{}. {}".format(i, instance_to_string(instance))) | |
select_instnace_index = input() | |
try: | |
selected_instance = instances[int(select_instnace_index)] | |
except ValueError: | |
print("숫자를 입력해주세요.") | |
sys.exit(1) | |
except IndexError: | |
print("범위 내의 번호를 선택해주세요") | |
sys.exit(1) | |
print("\n{}의 설정 검사를 시작합니다.\n".format(instance_to_string(selected_instance))) | |
print("# EC2 Instance의 Security Group 설정을 검사합니다.\n") | |
ssh_security_groups = check_ssh_security_group(selected_instance) | |
if not ssh_security_groups: | |
print("EC2 Instance의 Security Group 설정이 수정되어야합니다.") | |
print("{}로 설정된 Security Group을 찾을 수 없습니다.".format(QD_NAT_IP)) | |
sys.exit(1) | |
else: | |
print("EC2 Instance의 Security Group 설정이 정상적으로 되었습니다.") | |
for group, ip_network in ssh_security_groups: | |
print( | |
"{} ({})에서 {} 네트워크 대역 설정을 발견".format( | |
group["GroupId"], group["GroupName"], ip_network | |
) | |
) | |
print("\n\n# EC2 Instance의 VPC 설정을 검사합니다.\n") | |
if not selected_instance.get("VpcId"): | |
print("VPC 설정되어 있지 않음") | |
sys.exit(1) | |
vpc_acls = check_vpc_network_acl(selected_instance["VpcId"]) | |
if not vpc_acls: | |
print("VPC Network ACL 설정이 수정되어야합니다.") | |
sys.exit(1) | |
else: | |
print("VPC Network ACL 설정이 정상적으로 되었습니다.") | |
for acl_id, entry in vpc_acls: | |
print( | |
"{}(룰번호: {}) 에서 CIDR 블락 `{}`에 포함됨이 확인되었습니다.".format( | |
acl_id, entry["RuleNumber"], entry["CidrBlock"] | |
) | |
) | |
print("\n\n# EC2 Instance의 VPC internet gateway 설정을 검사합니다.\n") | |
vpc_igws = check_vpc_igw(selected_instance["VpcId"]) | |
if not vpc_igws: | |
print("IGW 설정이 없습니다.") | |
else: | |
print("IGW 설정이 정상적으로 되어있습니다.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment