Mitch Garnaat

Merge pull request #3 from garnaat/refactor

WIP commit on significant refactoring of code.
language: python
python:
- "2.7"
- "3.3"
- "3.4"
install:
- pip install -r requirements.txt
- pip install coverage python-coveralls
script: nosetests tests/unit --cover-erase --with-coverage --cover-package kappa
after_success: coveralls
kappa
=====
[![Build Status](https://travis-ci.org/garnaat/kappa.svg?branch=develop)](https://travis-ci.org/garnaat/kappa)
[![Code Health](https://landscape.io/github/garnaat/kappa/develop/landscape.svg)](https://landscape.io/github/garnaat/kappa/develop)
**Kappa** is a command line tool that (hopefully) makes it easier to
deploy, update, and test functions for AWS Lambda.
......
......@@ -14,33 +14,8 @@
import logging
import click
import yaml
from kappa import Kappa
FmtString = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
def set_debug_logger(logger_names=['kappa'], stream=None):
"""
Convenience function to quickly configure full debug output
to go to the console.
"""
for logger_name in logger_names:
log = logging.getLogger(logger_name)
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler(stream)
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(FmtString)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
from kappa.context import Context
@click.command()
......@@ -62,27 +37,26 @@ def set_debug_logger(logger_names=['kappa'], stream=None):
type=click.Choice(['deploy', 'test', 'tail', 'add-event-source', 'delete'])
)
def main(config=None, debug=False, command=None):
if debug:
set_debug_logger()
config = yaml.load(config)
kappa = Kappa(config)
ctx = Context(config, debug)
if command == 'deploy':
click.echo('Deploying ...')
kappa.deploy()
ctx.deploy()
click.echo('...done')
elif command == 'test':
click.echo('Sending test data ...')
kappa.test()
ctx.test()
click.echo('...done')
elif command == 'tail':
kappa.tail()
events = ctx.tail()
for event in events:
print(event['message'])
elif command == 'delete':
click.echo('Deleting ...')
kappa.delete()
ctx.delete()
click.echo('...done')
elif command == 'add-event-source':
click.echo('Adding event source ...')
kappa.add_event_source()
ctx.add_event_source()
click.echo('...done')
......
This diff is collapsed. Click to expand it.
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import botocore.session
class __AWS(object):
def __init__(self, profile=None, region=None):
self._client_cache = {}
self._session = botocore.session.get_session()
self._session.profile = profile
self._region = region
def create_client(self, client_name):
if client_name not in self._client_cache:
self._client_cache[client_name] = self._session.create_client(
client_name, self._region)
return self._client_cache[client_name]
__Singleton_AWS = None
def get_aws(context):
global __Singleton_AWS
if __Singleton_AWS is None:
__Singleton_AWS = __AWS(context.profile, context.region)
return __Singleton_AWS
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import yaml
import kappa.function
import kappa.event_source
import kappa.stack
LOG = logging.getLogger(__name__)
DebugFmtString = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
InfoFmtString = '\t%(message)s'
class Context(object):
def __init__(self, config_file, debug=False):
if debug:
self.set_logger('kappa', logging.DEBUG)
else:
self.set_logger('kappa', logging.INFO)
self.config = yaml.load(config_file)
self._stack = kappa.stack.Stack(
self, self.config['cloudformation'])
self.function = kappa.function.Function(
self, self.config['lambda'])
self.event_sources = []
self._create_event_sources()
@property
def profile(self):
return self.config.get('profile', None)
@property
def region(self):
return self.config.get('region', None)
@property
def cfn_config(self):
return self.config.get('cloudformation', None)
@property
def lambda_config(self):
return self.config.get('lambda', None)
@property
def exec_role_arn(self):
return self._stack.invoke_role_arn
@property
def invoke_role_arn(self):
return self._stack.invoke_role_arn
def debug(self):
self.set_logger('kappa', logging.DEBUG)
def set_logger(self, logger_name, level=logging.INFO):
"""
Convenience function to quickly configure full debug output
to go to the console.
"""
log = logging.getLogger(logger_name)
log.setLevel(level)
ch = logging.StreamHandler(None)
ch.setLevel(level)
# create formatter
if level == logging.INFO:
formatter = logging.Formatter(InfoFmtString)
else:
formatter = logging.Formatter(DebugFmtString)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
def _create_event_sources(self):
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(kappa.event_source.KinesisEventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
else:
msg = 'Unsupported event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
def add_event_sources(self):
for event_source in self.event_sources:
event_source.add(self.function)
def deploy(self):
if self._stack.exists():
self._stack.update()
else:
self._stack.create()
self.function.upload()
def test(self):
self.function.test()
def tail(self):
return self.function.tail()
def delete(self):
self._stack.delete()
self.function.delete()
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import kappa.aws
LOG = logging.getLogger(__name__)
class EventSource(object):
def __init__(self, context, config):
self._context = context
self._config = config
@property
def arn(self):
return self._config['arn']
@property
def batch_size(self):
return self._config.get('batch_size', 100)
class KinesisEventSource(EventSource):
def __init__(self, context, config):
super(KinesisEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._lambda = aws.create_client('lambda')
def _get_uuid(self, function):
uuid = None
response = self._lambda.list_event_sources(
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSources']) > 0:
uuid = response['EventSources'][0]['UUID']
return uuid
def add(self, function):
try:
response = self._lambda.add_event_source(
FunctionName=function.name,
Role=self.invoke_role_arn,
EventSource=self.arn,
BatchSize=self.batch_size)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add Kinesis event source')
def remove(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.remove_event_source(
UUID=uuid)
LOG.debug(response)
return response
class S3EventSource(EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._s3 = aws.create_client('s3')
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def _get_bucket_name(self):
return self.arn.split(':')[-1]
def add(self, function):
notification_spec = {
'CloudFunctionConfiguration': {
'Id': self._make_notification_id(function.name),
'Events': [e for e in self.config['events']],
'CloudFunction': function.arn(),
'InvocationRole': self.invoke_role_arn}}
try:
response = self._s3.put_bucket_notification(
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add S3 event source')
def remove(self, function):
response = self._s3.get_bucket_notification(
Bucket=self._get_bucket_name())
LOG.debug(response)
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import os
import zipfile
import kappa.aws
import kappa.log
LOG = logging.getLogger(__name__)
class Function(object):
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._lambda_svc = aws.create_client('lambda')
self._arn = None
self._log = None
@property
def name(self):
return self._config['name']
@property
def runtime(self):
return self._config['runtime']
@property
def handler(self):
return self._config['handler']
@property
def mode(self):
return self._config['mode']
@property
def description(self):
return self._config['description']
@property
def timeout(self):
return self._config['timeout']
@property
def memory_size(self):
return self._config['memory_size']
@property
def zipfile_name(self):
return self._config['zipfile_name']
@property
def path(self):
return self._config['path']
@property
def test_data(self):
return self._config['test_data']
@property
def arn(self):
if self._arn is None:
try:
response = self._lambda_svc.get_function_configuration(
FunctionName=self.name)
LOG.debug(response)
self._arn = response['FunctionARN']
except Exception:
LOG.debug('Unable to find ARN for function: %s' % self.name)
return self._arn
@property
def log(self):
if self._log is None:
log_group_name = '/aws/lambda/%s' % self.name
self._log = kappa.log.Log(self._context, log_group_name)
return self._log
def tail(self):
LOG.debug('tailing function: %s', self.name)
return self.log.tail()
def _zip_lambda_dir(self, zipfile_name, lambda_dir):
LOG.debug('_zip_lambda_dir: lambda_dir=%s', lambda_dir)
LOG.debug('zipfile_name=%s', zipfile_name)
relroot = os.path.abspath(lambda_dir)
with zipfile.ZipFile(zipfile_name, 'w') as zf:
for root, dirs, files in os.walk(lambda_dir):
zf.write(root, os.path.relpath(root, relroot))
for file in files:
filename = os.path.join(root, file)
if os.path.isfile(filename):
arcname = os.path.join(
os.path.relpath(root, relroot), file)
zf.write(filename, arcname)
def _zip_lambda_file(self, zipfile_name, lambda_file):
LOG.debug('_zip_lambda_file: lambda_file=%s', lambda_file)
LOG.debug('zipfile_name=%s', zipfile_name)
with zipfile.ZipFile(zipfile_name, 'w') as zf:
zf.write(lambda_file)
def zip_lambda_function(self, zipfile_name, lambda_fn):
if os.path.isdir(lambda_fn):
self._zip_lambda_dir(zipfile_name, lambda_fn)
else:
self._zip_lambda_file(zipfile_name, lambda_fn)
def upload(self):
LOG.debug('uploading %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
exec_role = self._context.exec_role_arn
try:
response = self._lambda_svc.upload_function(
FunctionName=self.name,
FunctionZip=fp,
Runtime=self.runtime,
Role=exec_role,
Handler=self.handler,
Mode=self.mode,
Description=self.description,
Timeout=self.timeout,
MemorySize=self.memory_size)
LOG.debug(response)
except Exception:
LOG.exception('Unable to upload zip file')
def delete(self):
LOG.debug('deleting function %s', self.name)
response = self._lambda_svc.delete_function(FunctionName=self.name)
LOG.debug(response)
return response
def invoke_asynch(self, data_file):
LOG.debug('_invoke_async %s', data_file)
with open(data_file) as fp:
response = self._lambda_svc.invoke_async(
FunctionName=self.name,
InvokeArgs=fp)
LOG.debug(response)
def test(self):
self.invoke_asynch(self.test_data)
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
LOG = logging.getLogger(__name__)
import kappa.aws
class Log(object):
def __init__(self, context, log_group_name):
self._context = context
self.log_group_name = log_group_name
aws = kappa.aws.get_aws(self._context)
self._log_svc = aws.create_client('logs')
def streams(self):
LOG.debug('getting streams for log group: %s', self.log_group_name)
response = self._log_svc.describe_log_streams(
logGroupName=self.log_group_name)
LOG.debug(response)
return response['logStreams']
def tail(self):
LOG.debug('tailing log group: %s', self.log_group_name)
latest_stream = None
streams = self.streams()
for stream in streams:
if not latest_stream:
latest_stream = stream
elif stream['lastEventTimestamp'] > latest_stream['lastEventTimestamp']:
latest_stream = stream
response = self._log_svc.get_log_events(
logGroupName=self.log_group_name,
logStreamName=latest_stream['logStreamName'])
LOG.debug(response)
return response['events']
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import time
import kappa.aws
LOG = logging.getLogger(__name__)
class Stack(object):
completed_states = ('CREATE_COMPLETE', 'UPDATE_COMPLETE')
failed_states = ('ROLLBACK_COMPLETE')
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(self._context)
self._cfn = aws.create_client('cloudformation')
self._iam = aws.create_client('iam')
@property
def name(self):
return self._config['stack_name']
@property
def template_path(self):
return self._config['template']
@property
def exec_role(self):
return self._config['exec_role']
@property
def exec_role_arn(self):
return self._get_role_arn(self.exec_role)
@property
def invoke_role(self):
return self._config['invoke_role']
@property
def invoke_role_arn(self):
return self._get_role_arn(self.invoke_role)
def _get_role_arn(self, role_name):
role_arn = None
try:
resources = self._cfn.list_stack_resources(
StackName=self.name)
LOG.debug(resources)
except Exception:
LOG.exception('Unable to find role ARN: %s', role_name)
for resource in resources['StackResourceSummaries']:
if resource['LogicalResourceId'] == role_name:
role = self._iam.get_role(
RoleName=resource['PhysicalResourceId'])
LOG.debug(role)
role_arn = role['Role']['Arn']
LOG.debug('role_arn: %s', role_arn)
return role_arn
def exists(self):
"""
Does Cloudformation Stack already exist?
"""
try:
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug('Stack %s exists', self.name)
except Exception:
LOG.debug('Stack %s does not exist', self.name)
response = None
return response
def wait(self):
done = False
while not done:
time.sleep(1)
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug(response)
status = response['Stacks'][0]['StackStatus']
LOG.debug('Stack status is: %s', status)
if status in self.completed_states:
done = True
if status in self.failed_states:
msg = 'Could not create stack %s: %s' % (self.name, status)
raise ValueError(msg)
def create(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.create_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to create stack')
self.wait()
def update(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.update_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception as e:
if 'ValidationError' in str(e):
LOG.info('No Updates Required')
else:
LOG.exception('Unable to update stack')
self.wait()
def delete(self):
LOG.debug('delete_stack: stack_name=%s', self.name)
try:
response = self._cfn.delete_stack(StackName=self.name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to delete stack: %s', self.name)
botocore==0.75.0
botocore==0.80.0
click==3.3
PyYAML>=3.11
mock>=1.0.1
nose==1.3.1
tox==1.7.1
......
......@@ -16,6 +16,8 @@ lambda:
memory_size: 128
timeout: 3
mode: event
event_source: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
event_sources:
-
arn: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
test_data: input.json
\ No newline at end of file
......
......@@ -17,6 +17,8 @@ lambda:
timeout: 3
mode: event
test_data: input.json
event_source: arn:aws:s3:::sourcebucket
s3_events:
- s3:ObjectCreated:*
\ No newline at end of file
event_sources:
-
arn: arn:aws:s3:::test-1245812163
events:
- s3:ObjectCreated:*
......
......@@ -5,7 +5,7 @@ from setuptools import setup, find_packages
import os
requires = [
'botocore==0.75.0',
'botocore==0.80.0',
'click==3.3',
'PyYAML>=3.11'
]
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
File mode changed
import mock
import tests.unit.responses as responses
class MockAWS(object):
def __init__(self, profile=None, region=None):
pass
def create_client(self, client_name):
client = None
if client_name == 'logs':
client = mock.Mock()
choices = responses.logs_describe_log_streams
client.describe_log_streams = mock.Mock(
side_effect=choices)
choices = responses.logs_get_log_events
client.get_log_events = mock.Mock(
side_effect=choices)
if client_name == 'cloudformation':
client = mock.Mock()
choices = responses.cfn_list_stack_resources
client.list_stack_resources = mock.Mock(
side_effect=choices)
choices = responses.cfn_describe_stacks
client.describe_stacks = mock.Mock(
side_effect=choices)
choices = responses.cfn_create_stack
client.create_stack = mock.Mock(
side_effect=choices)
choices = responses.cfn_delete_stack
client.delete_stack = mock.Mock(
side_effect=choices)
if client_name == 'iam':
client = mock.Mock()
choices = responses.iam_get_role
client.get_role = mock.Mock(
side_effect=choices)
return client
def get_aws(context):
return MockAWS()
import datetime
from dateutil.tz import tzutc
cfn_list_stack_resources = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'dd35f0ef-9699-11e4-ba38-c355c9515dbc'}, u'StackResourceSummaries': [{u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Role', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 54, 861000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestKinesis-InvokeRole-IF6VUXY9MBJN', u'LogicalResourceId': 'InvokeRole'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Role', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 55, 18000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestKinesis-ExecRole-567SAV6TZOET', u'LogicalResourceId': 'ExecRole'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Policy', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 58, 120000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestK-Invo-OMW5SDLQM8FM', u'LogicalResourceId': 'InvokeRolePolicies'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Policy', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 58, 454000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestK-Exec-APWRVKTBPPPT', u'LogicalResourceId': 'ExecRolePolicies'}]}]
iam_get_role = [{u'Role': {u'AssumeRolePolicyDocument': {u'Version': u'2012-10-17', u'Statement': [{u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u's3.amazonaws.com'}, u'Effect': u'Allow', u'Condition': {u'ArnLike': {u'sts:ExternalId': u'arn:aws:s3:::*'}}, u'Sid': u''}, {u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u'lambda.amazonaws.com'}, u'Effect': u'Allow', u'Sid': u''}]}, u'RoleId': 'AROAIEVJHUJG2I4MG5PSC', u'CreateDate': datetime.datetime(2015, 1, 6, 17, 37, 44, tzinfo=tzutc()), u'RoleName': 'TestKinesis-InvokeRole-IF6VUXY9MBJN', u'Path': '/', u'Arn': 'arn:aws:iam::0123456789012:role/TestKinesis-InvokeRole-FOO'}, 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'dd6e8d42-9699-11e4-afe6-d3625e8b365b'}}]
logs_describe_log_streams = [{u'logStreams': [{u'firstEventTimestamp': 1417042749449, u'lastEventTimestamp': 1417042749547, u'creationTime': 1417042748263, u'uploadSequenceToken': u'49540114640150833041490484409222729829873988799393975922', u'logStreamName': u'1cc48e4e613246b7974094323259d600', u'lastIngestionTime': 1417042750483, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:1cc48e4e613246b7974094323259d600', u'storedBytes': 712}, {u'firstEventTimestamp': 1417272406988, u'lastEventTimestamp': 1417272407088, u'creationTime': 1417272405690, u'uploadSequenceToken': u'49540113907504451034164105858363493278561872472363261986', u'logStreamName': u'2782a5ff88824c85a9639480d1ed7bbe', u'lastIngestionTime': 1417272408043, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:2782a5ff88824c85a9639480d1ed7bbe', u'storedBytes': 712}, {u'firstEventTimestamp': 1420569035842, u'lastEventTimestamp': 1420569035941, u'creationTime': 1420569034614, u'uploadSequenceToken': u'49540113907883563702539166025438885323514410026454245426', u'logStreamName': u'2d62991a479b4ebf9486176122b72a55', u'lastIngestionTime': 1420569036909, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:2d62991a479b4ebf9486176122b72a55', u'storedBytes': 709}, {u'firstEventTimestamp': 1418244027421, u'lastEventTimestamp': 1418244027541, u'creationTime': 1418244026907, u'uploadSequenceToken': u'49540113964795065449189116778452984186276757901477438642', u'logStreamName': u'4f44ffa128d6405591ca83b2b0f9dd2d', u'lastIngestionTime': 1418244028484, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:4f44ffa128d6405591ca83b2b0f9dd2d', u'storedBytes': 1010}, {u'firstEventTimestamp': 1418242565524, u'lastEventTimestamp': 1418242565641, u'creationTime': 1418242564196, u'uploadSequenceToken': u'49540113095132904942090446312687285178819573422397343074', u'logStreamName': u'69c5ac87e7e6415985116e8cb44e538e', u'lastIngestionTime': 1418242566558, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:69c5ac87e7e6415985116e8cb44e538e', u'storedBytes': 713}, {u'firstEventTimestamp': 1417213193378, u'lastEventTimestamp': 1417213193478, u'creationTime': 1417213192095, u'uploadSequenceToken': u'49540113336360065754596187770479764234792559857643841394', u'logStreamName': u'f68e3d87b8a14cdba338f6926f7cf50a', u'lastIngestionTime': 1417213194421, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:f68e3d87b8a14cdba338f6926f7cf50a', u'storedBytes': 711}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '2a6d4941-969b-11e4-947f-19d1c72ede7e'}}]
logs_get_log_events = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '2a7deb71-969b-11e4-914b-8f1f3d7b023d'}, u'nextForwardToken': u'f/31679748107442531967654742688057700554200447759088287749', u'events': [{u'ingestionTime': 1420569036909, u'timestamp': 1420569035842, u'message': u'2015-01-06T18:30:35.841Z\tko2sss03iq7l2pdk\tLoading event\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035899, u'message': u'START RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035940, u'message': u'2015-01-06T18:30:35.940Z\t23007242-95d2-11e4-a10e-7b2ab60a7770\t{\n "Records": [\n {\n "kinesis": {\n "partitionKey": "partitionKey-3",\n "kinesisSchemaVersion": "1.0",\n "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",\n "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"\n },\n "eventSource": "aws:kinesis",\n "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",\n "invokeIdentityArn": "arn:aws:iam::0123456789012:role/testLEBRole",\n "eventVersion": "1.0",\n "eventName": "aws:kinesis:record",\n "eventSourceARN": "arn:aws:kinesis:us-east-1:35667example:stream/examplestream",\n "awsRegion": "us-east-1"\n }\n ]\n}\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035940, u'message': u'2015-01-06T18:30:35.940Z\t23007242-95d2-11e4-a10e-7b2ab60a7770\tDecoded payload: Hello, this is a test 123.\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035941, u'message': u'END RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035941, u'message': u'REPORT RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\tDuration: 98.51 ms\tBilled Duration: 100 ms \tMemory Size: 128 MB\tMax Memory Used: 26 MB\t\n'}], u'nextBackwardToken': u'b/31679748105234758193000210997045664445208259969996226560'}]
cfn_describe_stacks = [
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7d66debd-96b8-11e4-a647-4f4741ffff69'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7e36fff7-96b8-11e4-af44-6350f4f8c2ae'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7ef03e10-96b8-11e4-bc86-7f67e11abcfa'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': None, u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_COMPLETE', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '8c2bff8e-96b8-11e4-be70-c5ad82c32f2d'}}]
cfn_create_stack = [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7c2f2260-96b8-11e4-be70-c5ad82c32f2d'}}]
cfn_delete_stack = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'f19af5b8-96bc-11e4-860e-11ba752b58a9'}}]
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import unittest
import mock
from kappa.log import Log
from tests.unit.mock_aws import get_aws
class TestLog(unittest.TestCase):
def setUp(self):
self.aws_patch = mock.patch('kappa.aws.get_aws', get_aws)
self.mock_aws = self.aws_patch.start()
def tearDown(self):
self.aws_patch.stop()
def test_streams(self):
mock_context = mock.Mock()
log = Log(mock_context, 'foo/bar')
streams = log.streams()
self.assertEqual(len(streams), 6)
def test_tail(self):
mock_context = mock.Mock()
log = Log(mock_context, 'foo/bar')
events = log.tail()
self.assertEqual(len(events), 6)
self.assertEqual(events[0]['ingestionTime'], 1420569036909)
self.assertIn('RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770',
events[-1]['message'])
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import unittest
import os
import mock
from kappa.stack import Stack
from tests.unit.mock_aws import get_aws
Config = {
'template': 'roles.cf',
'stack_name': 'FooBar',
'exec_role': 'ExecRole',
'invoke_role': 'InvokeRole'}
def path(filename):
return os.path.join(os.path.dirname(__file__), 'data', filename)
class TestStack(unittest.TestCase):
def setUp(self):
self.aws_patch = mock.patch('kappa.aws.get_aws', get_aws)
self.mock_aws = self.aws_patch.start()
Config['template'] = path(Config['template'])
def tearDown(self):
self.aws_patch.stop()
def test_properties(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
self.assertEqual(stack.name, Config['stack_name'])
self.assertEqual(stack.template_path, Config['template'])
self.assertEqual(stack.exec_role, Config['exec_role'])
self.assertEqual(stack.invoke_role, Config['invoke_role'])
self.assertEqual(
stack.invoke_role_arn,
'arn:aws:iam::0123456789012:role/TestKinesis-InvokeRole-FOO')
def test_exists(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
self.assertTrue(stack.exists())
def test_create(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
stack.create()
def test_delete(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
stack.delete()