AWS

[AWS] 파이썬 boto3를 사용한 AWS Security Group 과도한 포트 오픈 점검

Always-Try 2022. 2. 11. 02:07
  • 인/아웃바운드 점검
  • 80, 443 포트 IP ALL 오픈 여부 점검
  • Unknown 포트 IP ALL 오픈 여부 점검 (추가 검증 필요)
  • Security Group과 매핑되는 EC2 찾기
import boto3
from botocore.exceptions import ClientError

ec2 = boto3.client('ec2')


def get_sg_list():
    response = ec2.describe_security_groups()['SecurityGroups']
    print("1. Inbound ALL:ALL policy check")
    for i in response:
        sg_id = i['GroupId']
        sg_Name = i['GroupName']
        check_inbound_rule(sg_id, sg_Name)
    print("\n\n2. Outbound ALL:ALL policy check")
    for i in response:
        sg_id = i['GroupId']
        sg_Name = i['GroupName']
        check_outbound_rule(sg_id, sg_Name)



def get_instance_name(sg_id):
    try:
        response = ec2.describe_instances()
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                for securityGroup in instance['SecurityGroups']:
                    if sg_id in securityGroup['GroupId']:
                        #instance_id = instance['InstanceId']
                        instance_name = instance['Tags'][0]['Value']
                        return instance_name
                    else:
                        pass
    except Exception as e:
        print('get_instance_error', ee)
        pass


def check_inbound_rule(sg_id, sg_Name):
    try:
        data = ec2.describe_security_groups(GroupIds=[sg_id])['SecurityGroups'][0]['IpPermissions']
        #print(data)
        for i in data:
            get_protocol = i['IpProtocol']
            if get_protocol == '-1': #값이 -1이면 모든 트래픽을 의미함
                if 'FromPort' not in i : #FromPort가 없으므로 모든 포트를 뜻함
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security_Group_ID: {sg_id} // Security_Group_Name: {sg_Name} // IP_Address: 0.0.0.0/0 // Port: ALL // Description: None")
                elif 'Description' not in i['IpRanges'][0]: #FromPort 값이 존재하며(모든 포트는 아니고), 설명이 없는 경우
                    get_ip = i['IpRanges'][0]['CidrIp']
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security_Group_ID: {sg_id} // Security_Group_Name: {sg_Name} // IP_Address: {get_ip} // Port: ALL // Description: None")
                else: #FromPort 값이 존재하며(모든 포트는 아니고), 설명이 있는 경우
                    get_description = i['IpRanges'][0]['Description']
                    get_ip = i['IpRanges'][0]['CidrIp']
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security Group ID: {sg_id} // Security_Group_Name: {sg_Name} // IP Address : {get_ip} // Port: ALL // Description: {get_description}")
            elif (get_protocol == 'tcp' or 'udp') and (i['FromPort'] == '80' or '443') and i['IpRanges'][0]['CidrIp'] == '0.0.0.0/0': #80, 443이 열린 경우
                get_port = i['FromPort']
                get_ip = i['IpRanges'][0]['CidrIp']
                if 'Description' not in i['IpRanges'][0]:
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security_Group_ID: {sg_id} // Security_Group_Name: {sg_Name} // IP_Address: {get_ip} // Port: {get_port} // Description: None")
                else:
                    get_description = i['IpRanges'][0]['Description']
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security_Group_ID: {sg_id} // Security_Group_Name: {sg_Name} // IP_Address: {get_ip} // Port: {get_port} // Description: {get_description}")
    except KeyError as e:
        print('inbound keyerror', e)
    except IndexError as e:
        print('inbound Indexerror', e)


def check_outbound_rule(sg_id, sg_Name):
    try:
        data = ec2.describe_security_groups(GroupIds=[sg_id])['SecurityGroups'][0]['IpPermissionsEgress']
        #print(data)
        for i in data:
            get_protocol = i['IpProtocol']
            get_ip = i['IpRanges'][0]['CidrIp']
            if get_protocol == '-1':
                if 'Description' not in i['IpRanges'][0]:
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security Group ID: {sg_id} // Security_Group_Name: {sg_Name} // IP Address : {get_ip} // Port: ALL // Description: None")
                else:
                    get_description = i['IpRanges'][0]['Description']
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security Group ID: {sg_id} // Security_Group_Name: {sg_Name} // IP Address : {get_ip} // Port: ALL // Description: {get_description}")
            elif (get_protocol == 'tcp' or 'udp') and (i['FromPort'] == '80' or '443') and get_ip == '0.0.0.0/0':
                get_port = i['FromPort']
                get_ip = i['IpRanges'][0]['CidrIp']
                if 'Description' not in i['IpRanges'][0]:
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security_Group_ID: {sg_id} // Security_Group_Name: {sg_Name} // IP_Address: {get_ip} // Port: {get_port} // Description: None")
                else:
                    get_description = i['IpRanges'][0]['Description']
                    instance_name = get_instance_name(sg_id)
                    print(f"Instance_Name: {instance_name} // Security_Group_ID: {sg_id} // Security_Group_Name: {sg_Name} // IP_Address: {get_ip} // Port: {get_port} // Description: {get_description}")
    except KeyError as e:
        print('outbound keyerror', e)
    except IndexError as e:
        print('outbound Indexerror', e)



if __name__ == "__main__":
    get_sg_list()