role.py
3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
LOG = logging.getLogger(__name__)
AssumeRolePolicyDocument = """{
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
} ]
}"""
class Role(object):
Path = '/kappa/'
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._arn = None
@property
def name(self):
return self._config['name']
@property
def arn(self):
if self._arn is None:
try:
response = self._iam_svc.get_role(
RoleName=self.name)
LOG.debug(response)
self._arn = response['Role']['Arn']
except Exception:
LOG.debug('Unable to find ARN for role: %s', self.name)
return self._arn
def _find_all_roles(self):
# boto3 does not currently do pagination
# so we have to do it ourselves
roles = []
try:
response = self._iam_svc.list_roles()
roles += response['Roles']
while response['IsTruncated']:
LOG.debug('getting another page of roles')
response = self._iam_svc.list_roles(
Marker=response['Marker'])
roles += response['Roles']
except Exception:
LOG.exception('Error listing roles')
return roles
def exists(self):
for role in self._find_all_roles():
if role['RoleName'] == self.name:
return role
return None
def create(self):
LOG.debug('creating role %s', self.name)
role = self.exists()
if not role:
try:
response = self._iam_svc.create_role(
Path=self.Path, RoleName=self.name,
AssumeRolePolicyDocument=AssumeRolePolicyDocument)
LOG.debug(response)
if self._context.policy:
LOG.debug('attaching policy %s', self._context.policy.arn)
response = self._iam_svc.attach_role_policy(
RoleName=self.name,
PolicyArn=self._context.policy.arn)
LOG.debug(response)
except ClientError:
LOG.exception('Error creating Role')
def delete(self):
response = None
LOG.debug('deleting role %s', self.name)
try:
LOG.debug('First detach the policy from the role')
policy_arn = self._context.policy.arn
if policy_arn:
response = self._iam_svc.detach_role_policy(
RoleName=self.name, PolicyArn=policy_arn)
LOG.debug(response)
response = self._iam_svc.delete_role(RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.exception('role %s not found', self.name)
return response
def status(self):
LOG.debug('getting status for role %s', self.name)
try:
response = self._iam_svc.get_role(RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('role %s not found', self.name)
response = None
return response