Mitch Garnaat

A WIP commit on the new refactor for support of Python and other features.

# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
......
# Copyright (c) 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
import os
import datetime
import jmespath
import boto3
LOG = logging.getLogger(__name__)
def json_encoder(obj):
"""JSON encoder that formats datetimes as ISO8601 format."""
if isinstance(obj, datetime.datetime):
return obj.isoformat()
else:
return obj
class AWSClient(object):
def __init__(self, service_name, region_name, profile_name,
record_path=None):
self._service_name = service_name
self._region_name = region_name
self._profile_name = profile_name
self._record_path = record_path
self._client = self._create_client()
@property
def service_name(self):
return self._service_name
@property
def region_name(self):
return self._region_name
@property
def profile_name(self):
return self._profile_name
def _record(self, op_name, kwargs, data):
"""
This is a little hack to enable easier unit testing of the code.
Since botocore/boto3 has its own set of tests, I'm not interested in
trying to test it again here. So, this recording capability allows
us to save the data coming back from botocore as JSON files which
can then be used by the mocked awsclient in the unit test directory.
To enable this, pass in a record_path to the contructor and the JSON
data files will get stored in this path.
"""
if self._record_path:
path = os.path.expanduser(self._record_path)
path = os.path.expandvars(path)
path = os.path.join(path, self.service_name)
if not os.path.isdir(path):
os.mkdir(path)
path = os.path.join(path, self.region_name)
if not os.path.isdir(path):
os.mkdir(path)
path = os.path.join(path, self.account_id)
if not os.path.isdir(path):
os.mkdir(path)
filename = op_name
if kwargs:
for k, v in kwargs.items():
if k != 'query':
filename += '_{}_{}'.format(k, v)
filename += '.json'
path = os.path.join(path, filename)
with open(path, 'wb') as fp:
json.dump(data, fp, indent=4, default=json_encoder,
ensure_ascii=False)
def _create_client(self):
session = boto3.session.Session(
region_name=self._region_name, profile_name=self._profile_name)
return session.client(self._service_name)
def call(self, op_name, query=None, **kwargs):
"""
Make a request to a method in this client. The response data is
returned from this call as native Python data structures.
This method differs from just calling the client method directly
in the following ways:
* It automatically handles the pagination rather than
relying on a separate pagination method call.
* You can pass an optional jmespath query and this query
will be applied to the data returned from the low-level
call. This allows you to tailor the returned data to be
exactly what you want.
:type op_name: str
:param op_name: The name of the request you wish to make.
:type query: str
:param query: A jmespath query that will be applied to the
data returned by the operation prior to returning
it to the user.
:type kwargs: keyword arguments
:param kwargs: Additional keyword arguments you want to pass
to the method when making the request.
"""
LOG.debug(kwargs)
if query:
query = jmespath.compile(query)
if self._client.can_paginate(op_name):
paginator = self._client.get_paginator(op_name)
results = paginator.paginate(**kwargs)
data = results.build_full_result()
else:
op = getattr(self._client, op_name)
data = op(**kwargs)
if query:
data = query.search(data)
self._record(op_name, kwargs, data)
return data
_client_cache = {}
def create_client(service_name, context):
global _client_cache
client_key = '{}:{}:{}'.format(service_name, context.region,
context.profile)
if client_key not in _client_cache:
_client_cache[client_key] = AWSClient(service_name,
context.region,
context.profile,
context.record_path)
return _client_cache[client_key]
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import yaml
import time
import os
import kappa.function
import kappa.event_source
......@@ -28,34 +30,53 @@ InfoFmtString = '\t%(message)s'
class Context(object):
def __init__(self, config_file, debug=False):
def __init__(self, config_file, environment=None, debug=False):
if debug:
self.set_logger('kappa', logging.DEBUG)
else:
self.set_logger('kappa', logging.INFO)
self._load_cache()
self.config = yaml.load(config_file)
if 'policy' in self.config.get('iam', ''):
self.policy = kappa.policy.Policy(
self, self.config['iam']['policy'])
else:
self.policy = None
if 'role' in self.config.get('iam', ''):
self.role = kappa.role.Role(
self, self.config['iam']['role'])
else:
self.role = None
self.environment = environment
self.policy = kappa.policy.Policy(
self, self.config['environments'][self.environment])
self.role = kappa.role.Role(
self, self.config['environments'][self.environment])
self.function = kappa.function.Function(
self, self.config['lambda'])
self.event_sources = []
self._create_event_sources()
def _load_cache(self):
self.cache = {}
if os.path.isdir('.kappa'):
cache_file = os.path.join('.kappa', 'cache')
if os.path.isfile(cache_file):
with open(cache_file, 'rb') as fp:
self.cache = yaml.load(fp)
def save_cache(self):
if not os.path.isdir('.kappa'):
os.mkdir('.kappa')
cache_file = os.path.join('.kappa', 'cache')
with open(cache_file, 'wb') as fp:
yaml.dump(self.cache, fp)
@property
def name(self):
return self.config.get('name', None)
@property
def profile(self):
return self.config.get('profile', None)
return self.config['environments'][self.environment]['profile']
@property
def region(self):
return self.config.get('region', None)
return self.config['environments'][self.environment]['region']
@property
def record_path(self):
return self.config.get('record_path', None)
@property
def lambda_config(self):
......@@ -134,6 +155,18 @@ class Context(object):
time.sleep(5)
self.function.create()
def deploy(self):
if self.policy:
self.policy.deploy()
if self.role:
self.role.create()
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
time.sleep(5)
self.function.deploy()
def update_code(self):
self.function.update()
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
import kappa.awsclient
LOG = logging.getLogger(__name__)
......@@ -47,12 +48,12 @@ class KinesisEventSource(EventSource):
def __init__(self, context, config):
super(KinesisEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._lambda = aws.create_client('lambda')
self._lambda = kappa.awsclient.create_client('kinesis', context)
def _get_uuid(self, function):
uuid = None
response = self._lambda.list_event_source_mappings(
response = self._lambda.call(
'list_event_source_mappings',
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
......@@ -62,7 +63,8 @@ class KinesisEventSource(EventSource):
def add(self, function):
try:
response = self._lambda.create_event_source_mapping(
response = self._lambda.call(
'create_event_source_mapping',
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
......@@ -78,7 +80,8 @@ class KinesisEventSource(EventSource):
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.update_event_source_mapping(
response = self._lambda.call(
'update_event_source_mapping',
BatchSize=self.batch_size,
Enabled=self.enabled,
FunctionName=function.arn)
......@@ -90,7 +93,8 @@ class KinesisEventSource(EventSource):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.delete_event_source_mapping(
response = self._lambda.call(
'delete_event_source_mapping',
UUID=uuid)
LOG.debug(response)
return response
......@@ -101,7 +105,8 @@ class KinesisEventSource(EventSource):
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.get_event_source_mapping(
response = self._lambda.call(
'get_event_source_mapping',
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
......@@ -121,8 +126,7 @@ class S3EventSource(EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._s3 = aws.create_client('s3')
self._s3 = kappa.awsclient.create_client('s3', config)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
......@@ -141,7 +145,8 @@ class S3EventSource(EventSource):
]
}
try:
response = self._s3.put_bucket_notification_configuration(
response = self._s3.call(
'put_bucket_notification_configuration',
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
......@@ -154,7 +159,8 @@ class S3EventSource(EventSource):
def remove(self, function):
LOG.debug('removing s3 notification')
response = self._s3.get_bucket_notification(
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' in response:
......@@ -162,14 +168,16 @@ class S3EventSource(EventSource):
if fn_arn == function.arn:
del response['CloudFunctionConfiguration']
del response['ResponseMetadata']
response = self._s3.put_bucket_notification(
response = self._s3.call(
'put_bucket_notification',
Bucket=self._get_bucket_name(),
NotificationConfiguration=response)
LOG.debug(response)
def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
response = self._s3.get_bucket_notification(
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' not in response:
......@@ -181,15 +189,15 @@ class SNSEventSource(EventSource):
def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._sns = aws.create_client('sns')
self._sns = kappa.awsclient.create_client('sns', context)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def exists(self, function):
try:
response = self._sns.list_subscriptions_by_topic(
response = self._sns.call(
'list_subscriptions_by_topic',
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
......@@ -201,7 +209,8 @@ class SNSEventSource(EventSource):
def add(self, function):
try:
response = self._sns.subscribe(
response = self._sns.call(
'subscribe',
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
......@@ -216,7 +225,8 @@ class SNSEventSource(EventSource):
try:
subscription = self.exists(function)
if subscription:
response = self._sns.unsubscribe(
response = self._sns.call(
'unsubscribe',
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import zipfile
import time
from botocore.exceptions import ClientError
import kappa.aws
import kappa.awsclient
import kappa.log
LOG = logging.getLogger(__name__)
......@@ -28,14 +30,14 @@ class Function(object):
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._lambda_svc = aws.create_client('lambda')
self._arn = None
self._lambda_client = kappa.awsclient.create_client(
'lambda', context)
self._response = None
self._log = None
@property
def name(self):
return self._config['name']
return self._context.name
@property
def runtime(self):
......@@ -73,17 +75,44 @@ class Function(object):
def permissions(self):
return self._config.get('permissions', list())
@property
def arn(self):
if self._arn is None:
def _get_response(self):
if self._response is None:
try:
response = self._lambda_svc.get_function(
self._response = self._lambda_client.call(
'get_function',
FunctionName=self.name)
LOG.debug(response)
self._arn = response['Configuration']['FunctionArn']
LOG.debug(self._response)
except Exception:
LOG.debug('Unable to find ARN for function: %s', self.name)
return self._arn
return self._response
@property
def code_sha_256(self):
response = self._get_response()
return response['Configuration']['CodeSha256']
@property
def arn(self):
response = self._get_response()
return response['Configuration']['FunctionArn']
@property
def version(self):
response = self._get_response()
return response['Configuration']['Version']
@property
def repository_type(self):
response = self._get_response()
return response['Code']['RepositoryType']
@property
def location(self):
response = self._get_response()
return response['Code']['Location']
def exists(self):
return self._get_response()
@property
def log(self):
......@@ -125,6 +154,8 @@ class Function(object):
self._zip_lambda_file(zipfile_name, lambda_fn)
def add_permissions(self):
if self.permissions:
time.sleep(5)
for permission in self.permissions:
try:
kwargs = {
......@@ -138,7 +169,8 @@ class Function(object):
source_account = permission.get('source_account', None)
if source_account:
kwargs['SourceAccount'] = source_account
response = self._lambda_svc.add_permission(**kwargs)
response = self._lambda_client.call(
'add_permission', **kwargs)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add permission')
......@@ -151,7 +183,8 @@ class Function(object):
LOG.debug('exec_role=%s', exec_role)
try:
zipdata = fp.read()
response = self._lambda_svc.create_function(
response = self._lambda_client.call(
'create_function',
FunctionName=self.name,
Code={'ZipFile': zipdata},
Runtime=self.runtime,
......@@ -168,21 +201,90 @@ class Function(object):
def update(self):
LOG.debug('updating %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
try:
zipdata = fp.read()
response = self._lambda_svc.update_function_code(
FunctionName=self.name,
ZipFile=zipdata)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update zip file')
stats = os.stat(self.zipfile_name)
if self._context.cache.get('zipfile_size') != stats.st_size:
self._context.cache['zipfile_size'] = stats.st_size
self._context.save_cache()
with open(self.zipfile_name, 'rb') as fp:
try:
zipdata = fp.read()
response = self._lambda_client.call(
'update_function_code',
FunctionName=self.name,
ZipFile=zipdata)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update zip file')
else:
LOG.info('Code has not changed')
def deploy(self):
if self.exists():
return self.update()
return self.create()
def publish_version(self, description):
LOG.debug('publishing version of ', self.name)
try:
response = self._lambda_client.call(
'publish_version',
FunctionName=self.name,
CodeSha256=self.code_sha_256,
Description=description)
LOG.debug(response)
except Exception:
LOG.exception('Unable to publish version')
return response['Version']
def list_versions(self):
LOG.debug('listing versions of ', self.name)
try:
response = self._lambda_client.call(
'list_versions_by_function',
FunctionName=self.name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to list versions')
return response['Versions']
def create_alias(self, name, description, version=None):
LOG.debug('creating alias of ', self.name)
if version is None:
version = self.version
try:
response = self._lambda_client.call(
'create_alias',
FunctionName=self.name,
Description=description,
FunctionVersion=self.version,
Name=name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to create alias')
def list_aliases(self):
LOG.debug('listing aliases of ', self.name)
try:
response = self._lambda_client.call(
'list_aliases',
FunctionName=self.name,
FunctionVersion=self.version)
LOG.debug(response)
except Exception:
LOG.exception('Unable to list aliases')
return response['Versions']
def tag(self, name, description):
version = self.publish_version(description)
self.create_alias(name, description, version)
def delete(self):
LOG.debug('deleting function %s', self.name)
response = None
try:
response = self._lambda_svc.delete_function(FunctionName=self.name)
response = self._lambda_client.call(
'delete_function',
FunctionName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('function %s: not found', self.name)
......@@ -191,7 +293,8 @@ class Function(object):
def status(self):
LOG.debug('getting status for function %s', self.name)
try:
response = self._lambda_svc.get_function(
response = self._lambda_client.call(
'get_function',
FunctionName=self.name)
LOG.debug(response)
except ClientError:
......@@ -202,7 +305,8 @@ class Function(object):
def invoke_asynch(self, data_file):
LOG.debug('_invoke_async %s', data_file)
with open(data_file) as fp:
response = self._lambda_svc.invoke_async(
response = self._lambda_client.call(
'invoke_async',
FunctionName=self.name,
InvokeArgs=fp)
LOG.debug(response)
......@@ -212,7 +316,8 @@ class Function(object):
test_data = self.test_data
LOG.debug('invoke %s', test_data)
with open(test_data) as fp:
response = self._lambda_svc.invoke(
response = self._lambda_client.call(
'invoke',
FunctionName=self.name,
InvocationType=invocation_type,
LogType='Tail',
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
import kappa.awsclient
LOG = logging.getLogger(__name__)
......@@ -25,12 +26,11 @@ class Log(object):
def __init__(self, context, log_group_name):
self._context = context
self.log_group_name = log_group_name
aws = kappa.aws.get_aws(self._context)
self._log_svc = aws.create_client('logs')
self._log_client = kappa.awsclient.create_client('logs', context)
def _check_for_log_group(self):
LOG.debug('checking for log group')
response = self._log_svc.describe_log_groups()
response = self._log_client.call('describe_log_groups')
log_group_names = [lg['logGroupName'] for lg in response['logGroups']]
return self.log_group_name in log_group_names
......@@ -40,7 +40,8 @@ class Log(object):
LOG.info(
'log group %s has not been created yet', self.log_group_name)
return []
response = self._log_svc.describe_log_streams(
response = self._log_client.call(
'describe_log_streams',
logGroupName=self.log_group_name)
LOG.debug(response)
return response['logStreams']
......@@ -58,7 +59,8 @@ class Log(object):
latest_stream = stream
elif stream['lastEventTimestamp'] > latest_stream['lastEventTimestamp']:
latest_stream = stream
response = self._log_svc.get_log_events(
response = self._log_client.call(
'get_log_events',
logGroupName=self.log_group_name,
logStreamName=latest_stream['logStreamName'])
LOG.debug(response)
......@@ -66,7 +68,8 @@ class Log(object):
def delete(self):
try:
response = self._log_svc.delete_log_group(
response = self._log_client.call(
'delete_log_group',
logGroupName=self.log_group_name)
LOG.debug(response)
except ClientError:
......
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
import hashlib
import kappa.aws
import kappa.awsclient
LOG = logging.getLogger(__name__)
......@@ -23,21 +26,39 @@ class Policy(object):
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._arn = None
self._iam_client = kappa.awsclient.create_client('iam', self._context)
self._arn = self._config['policy'].get('arn', None)
@property
def environment(self):
return self._context.environment
@property
def name(self):
return self._config['name']
return '{}-{}-policy'.format(self._context.name, self.environment)
@property
def description(self):
return self._config.get('description', None)
return 'A kappa policy to control access to {} resources'.format(
self.environment)
@property
def document(self):
return self._config.get('document', None)
if 'resources' not in self._config['policy']:
return None
document = {"Version": "2012-10-17"}
statements = []
document['Statement'] = statements
for resource in self._config['policy']['resources']:
arn = resource['arn']
_, _, service, _ = arn.split(':', 3)
statement = {"Effect": "Allow",
"Resource": resource['arn']}
actions = []
for action in resource['actions']:
actions.append("{}:{}".format(service, action))
statement['Action'] = actions
statements.append(statement)
return json.dumps(document, indent=2, sort_keys=True)
@property
def path(self):
......@@ -52,20 +73,21 @@ class Policy(object):
return self._arn
def _find_all_policies(self):
# boto3 does not currently do pagination
# so we have to do it ourselves
policies = []
try:
response = self._iam_svc.list_policies()
policies += response['Policies']
while response['IsTruncated']:
LOG.debug('getting another page of policies')
response = self._iam_svc.list_policies(
Marker=response['Marker'])
policies += response['Policies']
response = self._iam_client.call(
'list_policies')
except Exception:
LOG.exception('Error listing policies')
return policies
return response['Policies']
def _list_versions(self):
try:
response = self._iam_client.call(
'list_policy_versions',
PolicyArn=self.arn)
except Exception:
LOG.exception('Error listing policy versions')
return response['Versions']
def exists(self):
for policy in self._find_all_policies():
......@@ -73,27 +95,72 @@ class Policy(object):
return policy
return None
def create(self):
LOG.debug('creating policy %s', self.name)
def _add_policy_version(self):
document = self.document()
if not document:
LOG.debug('not a custom policy, no need to version it')
return
versions = self._list_versions()
if len(versions) == 5:
try:
response = self._iam_client.call(
'delete_policy_version',
PolicyArn=self.arn,
VersionId=versions[-1]['VersionId'])
except Exception:
LOG.exception('Unable to delete policy version')
# update policy with a new version here
try:
response = self._iam_client.call(
'create_policy_version',
PolicyArn=self.arn,
PolicyDocument=document,
SetAsDefault=True)
LOG.debug(response)
except Exception:
LOG.exception('Error creating new Policy version')
def deploy(self):
LOG.debug('deploying policy %s', self.name)
document = self.document()
if not document:
LOG.debug('not a custom policy, no need to create it')
return
policy = self.exists()
if not policy and self.document:
with open(self.document, 'rb') as fp:
try:
response = self._iam_svc.create_policy(
Path=self.path, PolicyName=self.name,
PolicyDocument=fp.read(),
Description=self.description)
LOG.debug(response)
except Exception:
LOG.exception('Error creating Policy')
if policy:
m = hashlib.md5()
m.update(document)
policy_md5 = m.hexdigest()
LOG.debug('policy_md5: {}'.format(policy_md5))
LOG.debug('cache md5: {}'.format(
self._context.cache.get('policy_md5')))
if policy_md5 != self._context.cache.get('policy_md5'):
self._context.cache['policy_md5'] = policy_md5
self._context.save_cache()
self._add_policy_version()
else:
LOG.info('Policy unchanged')
else:
# create a new policy
try:
response = self._iam_client.call(
'create_policy',
Path=self.path, PolicyName=self.name,
PolicyDocument=document,
Description=self.description)
LOG.debug(response)
except Exception:
LOG.exception('Error creating Policy')
def delete(self):
response = None
# Only delete the policy if it has a document associated with it.
# This indicates that it was a custom policy created by kappa.
if self.arn and self.document:
document = self.document()
if self.arn and document:
LOG.debug('deleting policy %s', self.name)
response = self._iam_svc.delete_policy(PolicyArn=self.arn)
response = self._iam_client.call(
'delete_policy', PolicyArn=self.arn)
LOG.debug(response)
return response
......
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://aws.amazon.com/apache2.0/
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
import kappa.awsclient
LOG = logging.getLogger(__name__)
......@@ -39,20 +40,20 @@ class Role(object):
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._iam_client = kappa.awsclient.create_client('iam', context)
self._arn = None
@property
def name(self):
return self._config['name']
return '{}-{}-role'.format(
self._context.name, self._context.environment)
@property
def arn(self):
if self._arn is None:
try:
response = self._iam_svc.get_role(
RoleName=self.name)
response = self._iam_client.call(
'get_role', RoleName=self.name)
LOG.debug(response)
self._arn = response['Role']['Arn']
except Exception:
......@@ -60,20 +61,11 @@ class Role(object):
return self._arn
def _find_all_roles(self):
# boto3 does not currently do pagination
# so we have to do it ourselves
roles = []
try:
response = self._iam_svc.list_roles()
roles += response['Roles']
while response['IsTruncated']:
LOG.debug('getting another page of roles')
response = self._iam_svc.list_roles(
Marker=response['Marker'])
roles += response['Roles']
response = self._iam_client.call('list_roles')
except Exception:
LOG.exception('Error listing roles')
return roles
return response['Roles']
def exists(self):
for role in self._find_all_roles():
......@@ -86,13 +78,15 @@ class Role(object):
role = self.exists()
if not role:
try:
response = self._iam_svc.create_role(
response = self._iam_client.call(
'create_role',
Path=self.Path, RoleName=self.name,
AssumeRolePolicyDocument=AssumeRolePolicyDocument)
LOG.debug(response)
if self._context.policy:
LOG.debug('attaching policy %s', self._context.policy.arn)
response = self._iam_svc.attach_role_policy(
response = self._iam_client.call(
'attach_role_policy',
RoleName=self.name,
PolicyArn=self._context.policy.arn)
LOG.debug(response)
......@@ -106,10 +100,12 @@ class Role(object):
LOG.debug('First detach the policy from the role')
policy_arn = self._context.policy.arn
if policy_arn:
response = self._iam_svc.detach_role_policy(
response = self._iam_client.call(
'detach_role_policy',
RoleName=self.name, PolicyArn=policy_arn)
LOG.debug(response)
response = self._iam_svc.delete_role(RoleName=self.name)
response = self._iam_client.call(
'delete_role', RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.exception('role %s not found', self.name)
......@@ -118,7 +114,8 @@ class Role(object):
def status(self):
LOG.debug('getting status for role %s', self.name)
try:
response = self._iam_svc.get_role(RoleName=self.name)
response = self._iam_client.call(
'get_role', RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('role %s not found', self.name)
......
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014, 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
......@@ -11,8 +11,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from datetime import datetime
import logging
import base64
import click
......@@ -31,70 +31,97 @@ from kappa.context import Context
default=False,
help='Turn on debugging output'
)
@click.option(
'--environment',
help='Specify which environment to work with'
)
@click.pass_context
def cli(ctx, config=None, debug=False):
def cli(ctx, config=None, debug=False, environment=None):
config = config
ctx.obj['debug'] = debug
ctx.obj['config'] = config
ctx.obj['environment'] = environment
@cli.command()
@click.pass_context
def create(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('creating...')
context.create()
def deploy(ctx):
"""Deploy the Lambda function and any policies and roles required"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('deploying...')
context.deploy()
click.echo('...done')
@cli.command()
@click.pass_context
def update_code(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('updating code...')
context.update_code()
def tag(ctx):
"""Deploy the Lambda function and any policies and roles required"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('deploying...')
context.deploy()
click.echo('...done')
@cli.command()
@click.pass_context
def invoke(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Invoke the command synchronously"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('invoking...')
response = context.invoke()
log_data = base64.b64decode(response['LogResult'])
click.echo(log_data)
click.echo(response['Payload'].read())
click.echo('...done')
@cli.command()
@click.pass_context
def dryrun(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Show you what would happen but don't actually do anything"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('invoking dryrun...')
response = context.dryrun()
click.echo(response)
click.echo('...done')
@cli.command()
@click.pass_context
def invoke_async(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Invoke the Lambda function asynchronously"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('invoking async...')
response = context.invoke_async()
click.echo(response)
click.echo('...done')
@cli.command()
@click.pass_context
def tail(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Show the last 10 lines of the log file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('tailing logs...')
for e in context.tail()[-10:]:
ts = datetime.utcfromtimestamp(e['timestamp']//1000).isoformat()
click.echo("{}: {}".format(ts, e['message']))
click.echo('...done')
@cli.command()
@click.pass_context
def status(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Print a status of this Lambda function"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
status = context.status()
click.echo(click.style('Policy', bold=True))
if status['policy']:
......@@ -126,30 +153,38 @@ def status(ctx):
else:
click.echo(click.style(' None', fg='green'))
@cli.command()
@click.pass_context
def delete(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Delete the Lambda function and related policies and roles"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('deleting...')
context.delete()
click.echo('...done')
@cli.command()
@click.pass_context
def add_event_sources(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Add any event sources specified in the config file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('adding event sources...')
context.add_event_sources()
click.echo('...done')
@cli.command()
@click.pass_context
def update_event_sources(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
"""Update event sources specified in the config file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
click.echo('updating event sources...')
context.update_event_sources()
click.echo('...done')
if __name__ == '__main__':
cli(obj={})
cli(obj={})
......
boto3==1.1.1
click==4.0
boto3==1.2.0
click==5.1
PyYAML>=3.11
mock>=1.0.1
nose==1.3.1
......
......@@ -5,8 +5,8 @@ from setuptools import setup, find_packages
import os
requires = [
'boto3==1.1.1',
'click==4.0',
'boto3==1.2.0',
'click>=5.0',
'PyYAML>=3.11'
]
......@@ -22,7 +22,10 @@ setup(
packages=find_packages(exclude=['tests*']),
package_data={'kappa': ['_version']},
package_dir={'kappa': 'kappa'},
scripts=['bin/kappa'],
entry_points="""
[console_scripts]
kappa=kappa.scripts.cli:cli
""",
install_requires=requires,
license=open("LICENSE").read(),
classifiers=(
......