Always-Try(정보보안 및 일상)

파이썬 이용한 AWS Security Group 보안 (휴면 에러 방지) 본문

AWS

파이썬 이용한 AWS Security Group 보안 (휴면 에러 방지)

Always-Try 2021. 11. 22. 02:24

AWS 시큐리티 그룹을 운영하다보면 종종 대상 포트를 0으로 입력하는 실수를 하는 경우가 있는데, 이 때 SLACK으로 알림을 주는 기능을 구현했다. (테스트는 필자의 개인 AWS 계정 및 개인 SLACK에서 진행)

 

 

import json
import os
import requests

HOOK_URL = "삭제"



#authorize 당시 포트 0
event = {
    "eventVersion": "1.08",
    "userIdentity": {
        "type": "IAMUser",
        "principalId": "삭제",
        "arn": "arn:aws:iam::삭제:user/삭제",
        "accountId": "삭제",
        "accessKeyId": "삭제",
        "userName": "삭제",
        "sessionContext": {
            "sessionIssuer": {},
            "webIdFederationData": {},
            "attributes": {
                "creationDate": "2021-11-21T10:39:50Z",
                "mfaAuthenticated": "false"
            }
        }
    },
    "eventTime": "2021-11-21T13:42:05Z",
    "eventSource": "ec2.amazonaws.com",
    "eventName": "AuthorizeSecurityGroupEgress",
    "awsRegion": "ap-northeast-2",
    "sourceIPAddress": "삭제",
    "userAgent": "EC2ConsoleFrontend, aws-internal/3 aws-sdk-java/1.12.100 Linux/5.4.147-83.259.amzn2int.x86_64 OpenJDK_64-Bit_Server_VM/25.312-b07 java/1.8.0_312 vendor/Oracle_Corporation cfg/retry-mode/standard",
    "requestParameters": {
        "groupId": "sg-0969ece471f52c916",
        "ipPermissions": {
            "items": [
                {
                    "ipProtocol": "tcp",
                    "fromPort": 0,
                    "toPort": 0,
                    "groups": {},
                    "ipRanges": {
                        "items": [
                            {
                                "cidrIp": "0.0.0.0/0"
                            }
                        ]
                    },
                    "ipv6Ranges": {},
                    "prefixListIds": {}
                }
            ]
        }
    },
    "responseElements": {
        "requestId": "22ced91c-fcac-4ab2-91d2-87b93d83ffbf",
        "_return": True,
        "securityGroupRuleSet": {
            "items": [
                {
                    "groupOwnerId": "354913817145",
                    "groupId": "sg-0969ece471f52c916",
                    "securityGroupRuleId": "sgr-046d839ea9c347645",
                    "isEgress": True,
                    "ipProtocol": "tcp",
                    "fromPort": 0,
                    "toPort": 0,
                    "cidrIpv4": "0.0.0.0/0"
                }
            ]
        }
    },
    "requestID": "22ced91c-fcac-4ab2-91d2-87b93d83ffbf",
    "eventID": "ed70f7aa-3773-4738-8d37-c569c4b7358d",
    "readOnly": False,
    "eventType": "AwsApiCall",
    "managementEvent": True,
    "recipientAccountId": "354913817145",
    "eventCategory": "Management"
}




def Send_Message(slack_message):
    req = requests.post(HOOK_URL, data = json.dumps(slack_message), headers={'Content-Type': 'application/json'})

def event_set(event):
    data = event
    accountId = (data['userIdentity']['accountId'])
    sourceIP = (data['sourceIPAddress'])
    awsRegion = (data['awsRegion'])
    eventTime =(data['eventTime'])
    groupId = (data['requestParameters']['groupId'])
    principalId = (data['userIdentity']['principalId'])
    arn = (data['userIdentity']['arn'])
    toport = (data['requestParameters']['ipPermissions']['items'][0]['toPort'])
    fromport = (data['requestParameters']['ipPermissions']['items'][0]['fromPort'])
    protocol = (data['requestParameters']['ipPermissions']['items'][0]['ipProtocol'])
    ipv4 = (data['requestParameters']['ipPermissions']['items'][0]['ipRanges']['items'][0]['cidrIp'])
    ipv6 = (data['requestParameters']['ipPermissions']['items'][0]['ipv6Ranges'])

    try:
        description = (data['requestParameters']['ipPermissions']['items'][0]['ipRanges']['items'][0]['description'])
    except KeyError as e:
        description = "None"

    IP_Port_Checker(data, arn, principalId, accountId, sourceIP, awsRegion, eventTime, groupId, protocol, toport, fromport, description, ipv4, ipv6)

def IP_Port_Checker(data, arn, principalId, accountId, sourceIP, awsRegion, eventTime, groupId, protocol, toport, fromport, description, ipv4, ipv6):
    # if 'oneid' in arn:
    #     Username = principalId.split(':')[1]
    # else:
    username = arn.split('/')[1]

    #notice = ("<@>님, 계정 리전의 에 대해 삭제 혹은 출발지 지정 바랍니다." .format(Username, accountId_list.accountId_find(accountId), awsRegion, groupId))
    if (fromport == 0 & toport == 0):

        Message_data = {
	"blocks": [
		{
			"type": "header",
			"text": {
				"type": "plain_text",
				"text": "AWS SecurityGroup Notice",
				"emoji": True
			}
		},
		{
			"type": "section",
			"fields": [
				{
					"type": "mrkdwn",
					"text": f"*SecurityGroupID*\n<https://ap-northeast-2.console.aws.amazon.com/ec2/v2/home?region=ap-northeast-2#SecurityGroup:groupId={groupId}|{groupId}>"
				},
				{
					"type": "mrkdwn",
					"text": f"*UserID*\n{username}"
				}
			]
		},
		{
			"type": "section",
			"fields": [
				{
					"type": "mrkdwn",
					"text": f"*Information*\n Port {toport} Open"
				},
				{
					"type": "mrkdwn",
					"text": f"*Protocol*\n{protocol}"
				}
			]
		},
		{
			"type": "section",
			"fields": [
				{
					"type": "mrkdwn",
					"text": f"*CreateTime*\n{eventTime}"
				},
				{
					"type": "mrkdwn",
					"text": f"*Description*\n{description}"
				}
			]
		},
        {
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "*Notice*"
			}
		},
        {
			"type": "section",
			"text": {
				"type": "plain_text",
				"text": f"누구누구님, AWS {awsRegion} 리전 {groupId}에 대해 포트 재지정 부탁 드립니다."
			}
		}
	]
}
        Send_Message(Message_data)
    else:
        pass

def lambda_handler(event):
    event_set(event)

if __name__ == "__main__":
	lambda_handler(event)

 

 

Comments