Mitch Garnaat

Merge branch 'release-0.2.0'

language: python
python:
- "2.7"
- "3.3"
- "3.4"
install:
- pip install -r requirements.txt
- pip install coverage python-coveralls
script: nosetests tests/unit --cover-erase --with-coverage --cover-package kappa
after_success: coveralls
kappa
=====
[![Build Status](https://travis-ci.org/garnaat/kappa.svg?branch=develop)](https://travis-ci.org/garnaat/kappa)
[![Code Health](https://landscape.io/github/garnaat/kappa/develop/landscape.svg)](https://landscape.io/github/garnaat/kappa/develop)
**Kappa** is a command line tool that (hopefully) makes it easier to
deploy, update, and test functions for AWS Lambda.
......@@ -27,19 +31,27 @@ your function on.
Kappa is a command line tool. The basic command format is:
kappa --config <path to config file> <command>
kappa <path to config file> <command> [optional command args]
Where ``command`` is one of:
* deploy - deploy the CloudFormation template containing the IAM roles and zip the function and upload it to AWS Lambda
* deploy - deploy the CloudFormation template containing the IAM roles and zip
the function and upload it to AWS Lambda
* test - send test data to the new Lambda function
* tail - display the most recent log events for the function (remember that it can take several minutes before log events are available from CloudWatch)
* add-event-source - hook up an event source to your Lambda function
* delete - delete the CloudFormation stack containing the IAM roles and delete the Lambda function
* tail - display the most recent log events for the function (remember that it
can take several minutes before log events are available from CloudWatch)
* add-event-sources - hook up an event source to your Lambda function
* delete - delete the CloudFormation stack containing the IAM roles and delete
the Lambda function
* status - display summary information about functions, stacks, and event
sources related to your project.
The ``config file`` is a YAML format file containing all of the information
about your Lambda function.
If you use environment variables for your AWS credentials (as normally supported by boto),
simply exclude the ``profile`` element from the YAML file.
An example project based on a Kinesis stream can be found in
[samples/kinesis](https://github.com/garnaat/kappa/tree/develop/samples/kinesis).
......@@ -49,11 +61,11 @@ The basic workflow is:
* Create your CloudFormation template with the execution and invocation roles
* Create some sample data
* Create the YAML config file with all of the information
* Run ``kappa --config <path-to-config> deploy`` to create roles and upload function
* Run ``kappa --config <path-to-config> test`` to invoke the function with test data
* Run ``kappa --config <path-to-config> tail`` to view the functions output in CloudWatch logs
* Run ``kappa --config <path-to-config> add-event-source`` to hook your function up to the event source
* Run ``kappa --config <path-to-config> tail`` to see more output
* Run ``kappa <path-to-config> deploy`` to create roles and upload function
* Run ``kappa <path-to-config> test`` to invoke the function with test data
* Run ``kappa <path-to-config> tail`` to view the functions output in CloudWatch logs
* Run ``kappa <path-to-config> add-event-source`` to hook your function up to the event source
* Run ``kappa <path-to-config> tail`` to see more output
If you have to make changes in your function or in your IAM roles, simply run
``kappa deploy`` again and the changes will be uploaded as necessary.
\ No newline at end of file
``kappa deploy`` again and the changes will be uploaded as necessary.
......
......@@ -14,76 +14,100 @@
import logging
import click
import yaml
from kappa import Kappa
from kappa.context import Context
FmtString = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
def set_debug_logger(logger_names=['kappa'], stream=None):
"""
Convenience function to quickly configure full debug output
to go to the console.
"""
for logger_name in logger_names:
log = logging.getLogger(logger_name)
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler(stream)
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(FmtString)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
@click.command()
@click.option(
'--config',
help="Path to the Kappa config YAML file",
@click.group()
@click.argument(
'config',
type=click.File('rb'),
envvar='KAPPA_CONFIG',
default=None
)
@click.option(
'--debug/--no-debug',
default=False,
help='Turn on debugging output'
)
@click.argument(
'command',
required=True,
type=click.Choice(['deploy', 'test', 'tail', 'add-event-source', 'delete'])
)
def main(config=None, debug=False, command=None):
if debug:
set_debug_logger()
config = yaml.load(config)
kappa = Kappa(config)
if command == 'deploy':
click.echo('Deploying ...')
kappa.deploy()
elif command == 'test':
click.echo('Sending test data ...')
kappa.test()
click.echo('...done')
elif command == 'tail':
kappa.tail()
elif command == 'delete':
click.echo('Deleting ...')
kappa.delete()
click.echo('...done')
elif command == 'add-event-source':
click.echo('Adding event source ...')
kappa.add_event_source()
click.echo('...done')
@click.pass_context
def cli(ctx, config=None, debug=False):
config = config
ctx.obj['debug'] = debug
ctx.obj['config'] = config
@cli.command()
@click.pass_context
def deploy(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('deploying...')
context.deploy()
click.echo('...done')
@cli.command()
@click.pass_context
def test(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('testing...')
context.test()
click.echo('...done')
@cli.command()
@click.pass_context
def tail(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('tailing logs...')
context.tail()
click.echo('...done')
@cli.command()
@click.pass_context
def status(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
status = context.status()
click.echo(click.style('Stack', bold=True))
if status['stack']:
for stack in status['stack']['Stacks']:
line = ' {}: {}'.format(stack['StackId'], stack['StackStatus'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Function', bold=True))
if status['function']:
line = ' {}'.format(
status['function']['Configuration']['FunctionName'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Event Sources', bold=True))
if status['event_sources']:
for event_source in status['event_sources']:
if 'EventSource' in event_source:
line = ' {}: {}'.format(
event_source['EventSource'], event_source['IsActive'])
click.echo(click.style(line, fg='green'))
else:
line = ' {}'.format(
event_source['CloudFunctionConfiguration']['Id'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
@cli.command()
@click.pass_context
def delete(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('deleting...')
context.delete()
click.echo('...done')
@cli.command()
@click.pass_context
def add_event_sources(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('adding event sources...')
context.add_event_sources()
click.echo('...done')
if __name__ == '__main__':
main()
cli(obj={})
......
......@@ -11,192 +11,6 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import os
import zipfile
import time
import botocore.session
from botocore.exceptions import ClientError
LOG = logging.getLogger(__name__)
class Kappa(object):
completed_states = ('CREATE_COMPLETE', 'UPDATE_COMPLETE')
def __init__(self, config):
self.config = config
self.session = botocore.session.get_session()
self.session.profile = config['profile']
self.region = config['region']
def create_update_roles(self, stack_name, roles_path):
LOG.debug('create_update_policies: stack_name=%s', stack_name)
LOG.debug('create_update_policies: roles_path=%s', roles_path)
cfn = self.session.create_client('cloudformation', self.region)
# Does stack already exist?
try:
response = cfn.describe_stacks(StackName=stack_name)
LOG.debug('Stack %s already exists', stack_name)
except ClientError:
LOG.debug('Stack %s does not exist', stack_name)
response = None
template_body = open(roles_path).read()
if response:
try:
cfn.update_stack(
StackName=stack_name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
except ClientError, e:
LOG.debug(str(e))
else:
response = cfn.create_stack(
StackName=stack_name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
done = False
while not done:
time.sleep(1)
response = cfn.describe_stacks(StackName=stack_name)
status = response['Stacks'][0]['StackStatus']
LOG.debug('Stack status is: %s', status)
if status in self.completed_states:
done = True
def get_role_arn(self, role_name):
role_arn = None
cfn = self.session.create_client('cloudformation', self.region)
try:
resources = cfn.list_stack_resources(
StackName=self.config['cloudformation']['stack_name'])
except Exception:
LOG.exception('Unable to find role ARN: %s', role_name)
for resource in resources['StackResourceSummaries']:
if resource['LogicalResourceId'] == role_name:
iam = self.session.create_client('iam')
role = iam.get_role(RoleName=resource['PhysicalResourceId'])
role_arn = role['Role']['Arn']
LOG.debug('role_arn: %s', role_arn)
return role_arn
def delete_roles(self, stack_name):
LOG.debug('delete_roles: stack_name=%s', stack_name)
cfn = self.session.create_client('cloudformation', self.region)
try:
cfn.delete_stack(StackName=stack_name)
except Exception:
LOG.exception('Unable to delete stack: %s', stack_name)
def _zip_lambda_dir(self, zipfile_name, lambda_dir):
LOG.debug('_zip_lambda_dir: lambda_dir=%s', lambda_dir)
LOG.debug('zipfile_name=%s', zipfile_name)
relroot = os.path.abspath(os.path.join(lambda_dir, os.pardir))
with zipfile.ZipFile(zipfile_name, 'w') as zf:
for root, dirs, files in os.walk(lambda_dir):
zf.write(root, os.path.relpath(root, relroot))
for file in files:
filename = os.path.join(root, file)
if os.path.isfile(filename):
arcname = os.path.join(
os.path.relpath(root, relroot), file)
zf.write(filename, arcname)
def _zip_lambda_file(self, zipfile_name, lambda_file):
LOG.debug('_zip_lambda_file: lambda_file=%s', lambda_file)
LOG.debug('zipfile_name=%s', zipfile_name)
with zipfile.ZipFile(zipfile_name, 'w') as zf:
zf.write(lambda_file)
def zip_lambda_function(self, zipfile_name, lambda_fn):
if os.path.isdir(lambda_fn):
self._zip_lambda_dir(zipfile_name, lambda_fn)
else:
self._zip_lambda_file(zipfile_name, lambda_fn)
def upload_lambda_function(self, zip_file):
LOG.debug('uploading %s', zip_file)
lambda_svc = self.session.create_client('lambda', self.region)
with open(zip_file, 'rb') as fp:
exec_role = self.get_role_arn(
self.config['cloudformation']['exec_role'])
try:
response = lambda_svc.upload_function(
FunctionName=self.config['lambda']['name'],
FunctionZip=fp,
Runtime=self.config['lambda']['runtime'],
Role=exec_role,
Handler=self.config['lambda']['handler'],
Mode=self.config['lambda']['mode'],
Description=self.config['lambda']['description'],
Timeout=self.config['lambda']['timeout'],
MemorySize=self.config['lambda']['memory_size'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to upload zip file')
def delete_lambda_function(self, function_name):
LOG.debug('deleting function %s', function_name)
lambda_svc = self.session.create_client('lambda', self.region)
response = lambda_svc.delete_function(FunctionName=function_name)
LOG.debug(response)
return response
def _invoke_asynch(self, data_file):
LOG.debug('_invoke_async %s', data_file)
with open(data_file) as fp:
lambda_svc = self.session.create_client('lambda', self.region)
response = lambda_svc.invoke_async(
FunctionName=self.config['lambda']['name'],
InvokeArgs=fp)
LOG.debug(response)
def _tail(self, function_name):
LOG.debug('tailing function: %s', function_name)
log_svc = self.session.create_client('logs', self.region)
log_group_name = '/aws/lambda/%s' % function_name
latest_stream = None
response = log_svc.describe_log_streams(logGroupName=log_group_name)
for stream in response['logStreams']:
if not latest_stream:
latest_stream = stream
elif stream['lastEventTimestamp'] > latest_stream['lastEventTimestamp']:
latest_stream = stream
response = log_svc.get_log_events(
logGroupName=log_group_name,
logStreamName=latest_stream['logStreamName'])
for log_event in response['events']:
print('%s: %s' % (log_event['timestamp'], log_event['message']))
def add_event_source(self):
lambda_svc = self.session.create_client('lambda', self.region)
try:
invoke_role = self.get_role_arn(
self.config['cloudformation']['invoke_role'])
response = lambda_svc.add_event_source(
FunctionName=self.config['lambda']['name'],
Role=invoke_role,
EventSource=self.config['lambda']['event_source'],
BatchSize=self.config['lambda'].get('batch_size', 100))
LOG.debug(response)
except Exception:
LOG.exception('Unable to add event source')
def deploy(self):
self.create_update_roles(
self.config['cloudformation']['stack_name'],
self.config['cloudformation']['template'])
self.zip_lambda_function(
self.config['lambda']['zipfile_name'],
self.config['lambda']['path'])
self.upload_lambda_function(self.config['lambda']['zipfile_name'])
def test(self):
self._invoke_asynch(self.config['lambda']['test_data'])
def tail(self):
self._tail(self.config['lambda']['name'])
def delete(self):
self.delete_roles(self.config['cloudformation']['stack_name'])
self.delete_lambda_function(self.config['lambda']['name'])
__version__ = open(os.path.join(os.path.dirname(__file__), '_version')).read()
......
0.1.0
\ No newline at end of file
0.2.0
\ No newline at end of file
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import botocore.session
class __AWS(object):
def __init__(self, profile=None, region=None):
self._client_cache = {}
self._session = botocore.session.get_session()
self._session.profile = profile
self._region = region
def create_client(self, client_name):
if client_name not in self._client_cache:
self._client_cache[client_name] = self._session.create_client(
client_name, self._region)
return self._client_cache[client_name]
__Singleton_AWS = None
def get_aws(context):
global __Singleton_AWS
if __Singleton_AWS is None:
__Singleton_AWS = __AWS(context.profile, context.region)
return __Singleton_AWS
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import yaml
import kappa.function
import kappa.event_source
import kappa.stack
LOG = logging.getLogger(__name__)
DebugFmtString = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
InfoFmtString = '\t%(message)s'
class Context(object):
def __init__(self, config_file, debug=False):
if debug:
self.set_logger('kappa', logging.DEBUG)
else:
self.set_logger('kappa', logging.INFO)
self.config = yaml.load(config_file)
self._stack = kappa.stack.Stack(
self, self.config['cloudformation'])
self.function = kappa.function.Function(
self, self.config['lambda'])
self.event_sources = []
self._create_event_sources()
@property
def profile(self):
return self.config.get('profile', None)
@property
def region(self):
return self.config.get('region', None)
@property
def cfn_config(self):
return self.config.get('cloudformation', None)
@property
def lambda_config(self):
return self.config.get('lambda', None)
@property
def exec_role_arn(self):
return self._stack.exec_role_arn
@property
def invoke_role_arn(self):
return self._stack.invoke_role_arn
def debug(self):
self.set_logger('kappa', logging.DEBUG)
def set_logger(self, logger_name, level=logging.INFO):
"""
Convenience function to quickly configure full debug output
to go to the console.
"""
log = logging.getLogger(logger_name)
log.setLevel(level)
ch = logging.StreamHandler(None)
ch.setLevel(level)
# create formatter
if level == logging.INFO:
formatter = logging.Formatter(InfoFmtString)
else:
formatter = logging.Formatter(DebugFmtString)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
def _create_event_sources(self):
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
else:
msg = 'Unsupported event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
def add_event_sources(self):
for event_source in self.event_sources:
event_source.add(self.function)
def deploy(self):
self._stack.update()
self.function.upload()
def test(self):
self.function.test()
def tail(self):
return self.function.tail()
def delete(self):
self._stack.delete()
self.function.delete()
for event_source in self.event_sources:
event_source.remove(self.function)
def status(self):
status = {}
status['stack'] = self._stack.status()
status['function'] = self.function.status()
status['event_sources'] = []
for event_source in self.event_sources:
status['event_sources'].append(event_source.status(self.function))
return status
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
LOG = logging.getLogger(__name__)
class EventSource(object):
def __init__(self, context, config):
self._context = context
self._config = config
@property
def arn(self):
return self._config['arn']
@property
def batch_size(self):
return self._config.get('batch_size', 100)
class KinesisEventSource(EventSource):
def __init__(self, context, config):
super(KinesisEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._lambda = aws.create_client('lambda')
def _get_uuid(self, function):
uuid = None
response = self._lambda.list_event_sources(
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSources']) > 0:
uuid = response['EventSources'][0]['UUID']
return uuid
def add(self, function):
try:
response = self._lambda.add_event_source(
FunctionName=function.name,
Role=self._context.invoke_role_arn,
EventSource=self.arn,
BatchSize=self.batch_size)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add Kinesis event source')
def remove(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.remove_event_source(
UUID=uuid)
LOG.debug(response)
return response
def status(self, function):
LOG.debug('getting status for event source %s', self.arn)
try:
response = self._lambda.get_event_source(
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
return response
class S3EventSource(EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._s3 = aws.create_client('s3')
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def _get_bucket_name(self):
return self.arn.split(':')[-1]
def add(self, function):
notification_spec = {
'CloudFunctionConfiguration': {
'Id': self._make_notification_id(function.name),
'Events': [e for e in self._config['events']],
'CloudFunction': function.arn,
'InvocationRole': self._context.invoke_role_arn}}
try:
response = self._s3.put_bucket_notification(
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add S3 event source')
def remove(self, function):
LOG.debug('removing s3 notification')
response = self._s3.get_bucket_notification(
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' in response:
fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
if fn_arn == function.arn:
del response['CloudFunctionConfiguration']
response = self._s3.put_bucket_notification(
Bucket=self._get_bucket_name(),
NotificationConfiguration=response)
LOG.debug(response)
def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
response = self._s3.get_bucket_notification(
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' not in response:
response = None
return response
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import os
import zipfile
from botocore.exceptions import ClientError
import kappa.aws
import kappa.log
LOG = logging.getLogger(__name__)
class Function(object):
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._lambda_svc = aws.create_client('lambda')
self._arn = None
self._log = None
@property
def name(self):
return self._config['name']
@property
def runtime(self):
return self._config['runtime']
@property
def handler(self):
return self._config['handler']
@property
def mode(self):
return self._config['mode']
@property
def description(self):
return self._config['description']
@property
def timeout(self):
return self._config['timeout']
@property
def memory_size(self):
return self._config['memory_size']
@property
def zipfile_name(self):
return self._config['zipfile_name']
@property
def path(self):
return self._config['path']
@property
def test_data(self):
return self._config['test_data']
@property
def arn(self):
if self._arn is None:
try:
response = self._lambda_svc.get_function_configuration(
FunctionName=self.name)
LOG.debug(response)
self._arn = response['FunctionARN']
except Exception:
LOG.debug('Unable to find ARN for function: %s', self.name)
return self._arn
@property
def log(self):
if self._log is None:
log_group_name = '/aws/lambda/%s' % self.name
self._log = kappa.log.Log(self._context, log_group_name)
return self._log
def tail(self):
LOG.debug('tailing function: %s', self.name)
return self.log.tail()
def _zip_lambda_dir(self, zipfile_name, lambda_dir):
LOG.debug('_zip_lambda_dir: lambda_dir=%s', lambda_dir)
LOG.debug('zipfile_name=%s', zipfile_name)
relroot = os.path.abspath(lambda_dir)
with zipfile.ZipFile(zipfile_name, 'w',
compression=zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(lambda_dir):
zf.write(root, os.path.relpath(root, relroot))
for filename in files:
filepath = os.path.join(root, filename)
if os.path.isfile(filepath):
arcname = os.path.join(
os.path.relpath(root, relroot), filename)
zf.write(filepath, arcname)
def _zip_lambda_file(self, zipfile_name, lambda_file):
LOG.debug('_zip_lambda_file: lambda_file=%s', lambda_file)
LOG.debug('zipfile_name=%s', zipfile_name)
with zipfile.ZipFile(zipfile_name, 'w',
compression=zipfile.ZIP_DEFLATED) as zf:
zf.write(lambda_file)
def zip_lambda_function(self, zipfile_name, lambda_fn):
if os.path.isdir(lambda_fn):
self._zip_lambda_dir(zipfile_name, lambda_fn)
else:
self._zip_lambda_file(zipfile_name, lambda_fn)
def upload(self):
LOG.debug('uploading %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
exec_role = self._context.exec_role_arn
try:
response = self._lambda_svc.upload_function(
FunctionName=self.name,
FunctionZip=fp,
Runtime=self.runtime,
Role=exec_role,
Handler=self.handler,
Mode=self.mode,
Description=self.description,
Timeout=self.timeout,
MemorySize=self.memory_size)
LOG.debug(response)
except Exception:
LOG.exception('Unable to upload zip file')
def delete(self):
LOG.debug('deleting function %s', self.name)
response = self._lambda_svc.delete_function(FunctionName=self.name)
LOG.debug(response)
return response
def status(self):
LOG.debug('getting status for function %s', self.name)
try:
response = self._lambda_svc.get_function(
FunctionName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('function %s not found', self.name)
response = None
return response
def invoke_asynch(self, data_file):
LOG.debug('_invoke_async %s', data_file)
with open(data_file) as fp:
response = self._lambda_svc.invoke_async(
FunctionName=self.name,
InvokeArgs=fp)
LOG.debug(response)
def test(self):
self.invoke_asynch(self.test_data)
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
LOG = logging.getLogger(__name__)
import kappa.aws
class Log(object):
def __init__(self, context, log_group_name):
self._context = context
self.log_group_name = log_group_name
aws = kappa.aws.get_aws(self._context)
self._log_svc = aws.create_client('logs')
def _check_for_log_group(self):
LOG.debug('checking for log group')
response = self._log_svc.describe_log_groups()
log_group_names = [lg['logGroupName'] for lg in response['logGroups']]
return self.log_group_name in log_group_names
def streams(self):
LOG.debug('getting streams for log group: %s', self.log_group_name)
if not self._check_for_log_group():
LOG.info(
'log group %s has not been created yet', self.log_group_name)
return []
response = self._log_svc.describe_log_streams(
logGroupName=self.log_group_name)
LOG.debug(response)
return response['logStreams']
def tail(self):
LOG.debug('tailing log group: %s', self.log_group_name)
if not self._check_for_log_group():
LOG.info(
'log group %s has not been created yet', self.log_group_name)
return []
latest_stream = None
streams = self.streams()
for stream in streams:
if not latest_stream:
latest_stream = stream
elif stream['lastEventTimestamp'] > latest_stream['lastEventTimestamp']:
latest_stream = stream
response = self._log_svc.get_log_events(
logGroupName=self.log_group_name,
logStreamName=latest_stream['logStreamName'])
LOG.debug(response)
return response['events']
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import time
import kappa.aws
LOG = logging.getLogger(__name__)
class Stack(object):
completed_states = ('CREATE_COMPLETE', 'UPDATE_COMPLETE')
failed_states = ('ROLLBACK_COMPLETE',)
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(self._context)
self._cfn = aws.create_client('cloudformation')
self._iam = aws.create_client('iam')
@property
def name(self):
return self._config['stack_name']
@property
def template_path(self):
return self._config['template']
@property
def exec_role(self):
return self._config['exec_role']
@property
def exec_role_arn(self):
return self._get_role_arn(self.exec_role)
@property
def invoke_role(self):
return self._config['invoke_role']
@property
def invoke_role_arn(self):
return self._get_role_arn(self.invoke_role)
def _get_role_arn(self, role_name):
role_arn = None
try:
resources = self._cfn.list_stack_resources(
StackName=self.name)
LOG.debug(resources)
except Exception:
LOG.exception('Unable to find role ARN: %s', role_name)
for resource in resources['StackResourceSummaries']:
if resource['LogicalResourceId'] == role_name:
role = self._iam.get_role(
RoleName=resource['PhysicalResourceId'])
LOG.debug(role)
role_arn = role['Role']['Arn']
LOG.debug('role_arn: %s', role_arn)
return role_arn
def exists(self):
"""
Does Cloudformation Stack already exist?
"""
try:
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug('Stack %s exists', self.name)
except Exception:
LOG.debug('Stack %s does not exist', self.name)
response = None
return response
def wait(self):
done = False
while not done:
time.sleep(1)
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug(response)
status = response['Stacks'][0]['StackStatus']
LOG.debug('Stack status is: %s', status)
if status in self.completed_states:
done = True
if status in self.failed_states:
msg = 'Could not create stack %s: %s' % (self.name, status)
raise ValueError(msg)
def _create(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.create_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to create stack')
self.wait()
def _update(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.update_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception as e:
if 'ValidationError' in str(e):
LOG.info('No Updates Required')
else:
LOG.exception('Unable to update stack')
self.wait()
def update(self):
if self.exists():
self._update()
else:
self._create()
def status(self):
return self.exists()
def delete(self):
LOG.debug('delete_stack: stack_name=%s', self.name)
try:
response = self._cfn.delete_stack(StackName=self.name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to delete stack: %s', self.name)
botocore==0.75.0
botocore==0.94.0
click==3.3
PyYAML>=3.11
mock>=1.0.1
nose==1.3.1
tox==1.7.1
......
Kinesis Example
===============
This is a simple Lambda example that listens for events on a Kinesis stream.
This example is based on the one from the
[AWS Lambda Documentation](http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser.html). The Lambda function in this example doesn't really do anything other than log some data but this example does show how all of the pieces go together and how to use ``kappa`` to deploy the Lambda function.
What You Need To Do
-------------------
1. Edit the ``config.yml`` file. Specifically, you will need to edit the ``profile`` and ``region`` values and the ``event_source``
2. Run ``kappa --config config.yml deploy``
3. Run ``kappa --config config.yml test``
4. Run ``kappa --config config.yml tail``. You may have to run this command a few times before the log events become available in CloudWatch Logs.
5. Run ``kappa --config config.yml add-event-source``
6. Try sending data to the Kinesis stream and then tailing the logs again to see if your function is getting called.
......@@ -16,6 +16,8 @@ lambda:
memory_size: 128
timeout: 3
mode: event
event_source: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
event_sources:
-
arn: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
test_data: input.json
\ No newline at end of file
......
---
profile: personal
region: us-east-1
cloudformation:
template: roles.cf
stack_name: TestS3
exec_role: ExecRole
invoke_role: InvokeRole
lambda:
name: S3Sample
zipfile_name: S3Sample.zip
description: Testing S3 Lambda handler
path: examplefolder/
handler: CreateThumbnail.handler
runtime: nodejs
memory_size: 128
timeout: 3
mode: event
test_data: input.json
event_sources:
-
arn: arn:aws:s3:::test-1245812163
events:
- s3:ObjectCreated:*
// dependencies
var async = require('async');
var AWS = require('aws-sdk');
var gm = require('gm')
.subClass({ imageMagick: true }); // Enable ImageMagick integration.
var util = require('util');
// constants
var MAX_WIDTH = 100;
var MAX_HEIGHT = 100;
// get reference to S3 client
var s3 = new AWS.S3();
exports.handler = function(event, context) {
// Read options from the event.
console.log("Reading options from event:\n", util.inspect(event, {depth: 5}));
var srcBucket = event.Records[0].s3.bucket.name;
var srcKey = event.Records[0].s3.object.key;
var dstBucket = srcBucket + "resized";
var dstKey = "resized-" + srcKey;
// Sanity check: validate that source and destination are different buckets.
if (srcBucket == dstBucket) {
console.error("Destination bucket must not match source bucket.");
return;
}
// Infer the image type.
var typeMatch = srcKey.match(/\.([^.]*)$/);
if (!typeMatch) {
console.error('unable to infer image type for key ' + srcKey);
return;
}
var imageType = typeMatch[1];
if (imageType != "jpg" && imageType != "png") {
console.log('skipping non-image ' + srcKey);
return;
}
// Download the image from S3, transform, and upload to a different S3 bucket.
async.waterfall([
function download(next) {
// Download the image from S3 into a buffer.
s3.getObject({
Bucket: srcBucket,
Key: srcKey
},
next);
},
function tranform(response, next) {
gm(response.Body).size(function(err, size) {
// Infer the scaling factor to avoid stretching the image unnaturally.
var scalingFactor = Math.min(
MAX_WIDTH / size.width,
MAX_HEIGHT / size.height
);
var width = scalingFactor * size.width;
var height = scalingFactor * size.height;
// Transform the image buffer in memory.
this.resize(width, height)
.toBuffer(imageType, function(err, buffer) {
if (err) {
next(err);
} else {
next(null, response.ContentType, buffer);
}
});
});
},
function upload(contentType, data, next) {
// Stream the transformed image to a different S3 bucket.
s3.putObject({
Bucket: dstBucket,
Key: dstKey,
Body: data,
ContentType: contentType
},
next);
}
], function (err) {
if (err) {
console.error(
'Unable to resize ' + srcBucket + '/' + srcKey +
' and upload to ' + dstBucket + '/' + dstKey +
' due to an error: ' + err
);
} else {
console.log(
'Successfully resized ' + srcBucket + '/' + srcKey +
' and uploaded to ' + dstBucket + '/' + dstKey
);
}
context.done();
}
);
};
{
"Records":[
{
"eventVersion":"2.0",
"eventSource":"aws:s3",
"awsRegion":"us-east-1",
"eventTime":"1970-01-01T00:00:00.000Z",
"eventName":"ObjectCreated:Put",
"userIdentity":{
"principalId":"AIDAJDPLRKLG7UEXAMPLE"
},
"requestParameters":{
"sourceIPAddress":"127.0.0.1"
},
"responseElements":{
"x-amz-request-id":"C3D13FE58DE4C810",
"x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"testConfigRule",
"bucket":{
"name":"sourcebucket",
"ownerIdentity":{
"principalId":"A3NL1KOZZKExample"
},
"arn":"arn:aws:s3:::sourcebucket"
},
"object":{
"key":"HappyFace.jpg",
"size":1024,
"eTag":"d41d8cd98f00b204e9800998ecf8427e",
"versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko"
}
}
}
]
}
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"ExecRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
} ]
}
}
},
"ExecRolePolicies": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": "ExecRolePolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::*"
]
} ]
},
"Roles": [ { "Ref": "ExecRole" } ]
}
},
"InvokeRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "s3.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ],
"Condition": {
"ArnLike": {
"sts:ExternalId": "arn:aws:s3:::*"
}
}
} ]
}
}
},
"InvokeRolePolicies": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": "ExecRolePolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Resource": [
"*"
],
"Action": [
"lambda:InvokeFunction"
]
}
]
},
"Roles": [ { "Ref": "InvokeRole" } ]
}
}
}
}
......@@ -5,7 +5,7 @@ from setuptools import setup, find_packages
import os
requires = [
'botocore==0.75.0',
'botocore==0.94.0',
'click==3.3',
'PyYAML>=3.11'
]
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
File mode changed
import mock
import tests.unit.responses as responses
class MockAWS(object):
def __init__(self, profile=None, region=None):
pass
def create_client(self, client_name):
client = None
if client_name == 'logs':
client = mock.Mock()
choices = responses.logs_describe_log_groups
client.describe_log_groups = mock.Mock(
side_effect=choices)
choices = responses.logs_describe_log_streams
client.describe_log_streams = mock.Mock(
side_effect=choices)
choices = responses.logs_get_log_events
client.get_log_events = mock.Mock(
side_effect=choices)
if client_name == 'cloudformation':
client = mock.Mock()
choices = responses.cfn_list_stack_resources
client.list_stack_resources = mock.Mock(
side_effect=choices)
choices = responses.cfn_describe_stacks
client.describe_stacks = mock.Mock(
side_effect=choices)
choices = responses.cfn_create_stack
client.create_stack = mock.Mock(
side_effect=choices)
choices = responses.cfn_delete_stack
client.delete_stack = mock.Mock(
side_effect=choices)
if client_name == 'iam':
client = mock.Mock()
choices = responses.iam_get_role
client.get_role = mock.Mock(
side_effect=choices)
return client
def get_aws(context):
return MockAWS()
import datetime
from dateutil.tz import tzutc
cfn_list_stack_resources = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'dd35f0ef-9699-11e4-ba38-c355c9515dbc'}, u'StackResourceSummaries': [{u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Role', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 54, 861000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestKinesis-InvokeRole-IF6VUXY9MBJN', u'LogicalResourceId': 'InvokeRole'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Role', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 55, 18000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestKinesis-ExecRole-567SAV6TZOET', u'LogicalResourceId': 'ExecRole'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Policy', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 58, 120000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestK-Invo-OMW5SDLQM8FM', u'LogicalResourceId': 'InvokeRolePolicies'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Policy', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 58, 454000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestK-Exec-APWRVKTBPPPT', u'LogicalResourceId': 'ExecRolePolicies'}]}]
iam_get_role = [{u'Role': {u'AssumeRolePolicyDocument': {u'Version': u'2012-10-17', u'Statement': [{u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u's3.amazonaws.com'}, u'Effect': u'Allow', u'Condition': {u'ArnLike': {u'sts:ExternalId': u'arn:aws:s3:::*'}}, u'Sid': u''}, {u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u'lambda.amazonaws.com'}, u'Effect': u'Allow', u'Sid': u''}]}, u'RoleId': 'AROAIEVJHUJG2I4MG5PSC', u'CreateDate': datetime.datetime(2015, 1, 6, 17, 37, 44, tzinfo=tzutc()), u'RoleName': 'TestKinesis-InvokeRole-IF6VUXY9MBJN', u'Path': '/', u'Arn': 'arn:aws:iam::0123456789012:role/TestKinesis-InvokeRole-FOO'}, 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'dd6e8d42-9699-11e4-afe6-d3625e8b365b'}}]
logs_describe_log_groups = [{'ResponseMetadata': {'HTTPStatusCode': 200,
'RequestId': 'da962431-afed-11e4-8c17-1776597471e6'},
u'logGroups': [{u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample*',
u'creationTime': 1423175925414,
u'logGroupName': u'foo/bar',
u'metricFilterCount': 1,
u'storedBytes': 0}]},
{'ResponseMetadata': {'HTTPStatusCode': 200,
'RequestId': 'da962431-afed-11e4-8c17-1776597471e6'},
u'logGroups': [{u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample*',
u'creationTime': 1423175925414,
u'logGroupName': u'foo/bar',
u'metricFilterCount': 1,
u'storedBytes': 0}]}]
logs_describe_log_streams = [{u'logStreams': [{u'firstEventTimestamp': 1417042749449, u'lastEventTimestamp': 1417042749547, u'creationTime': 1417042748263, u'uploadSequenceToken': u'49540114640150833041490484409222729829873988799393975922', u'logStreamName': u'1cc48e4e613246b7974094323259d600', u'lastIngestionTime': 1417042750483, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:1cc48e4e613246b7974094323259d600', u'storedBytes': 712}, {u'firstEventTimestamp': 1417272406988, u'lastEventTimestamp': 1417272407088, u'creationTime': 1417272405690, u'uploadSequenceToken': u'49540113907504451034164105858363493278561872472363261986', u'logStreamName': u'2782a5ff88824c85a9639480d1ed7bbe', u'lastIngestionTime': 1417272408043, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:2782a5ff88824c85a9639480d1ed7bbe', u'storedBytes': 712}, {u'firstEventTimestamp': 1420569035842, u'lastEventTimestamp': 1420569035941, u'creationTime': 1420569034614, u'uploadSequenceToken': u'49540113907883563702539166025438885323514410026454245426', u'logStreamName': u'2d62991a479b4ebf9486176122b72a55', u'lastIngestionTime': 1420569036909, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:2d62991a479b4ebf9486176122b72a55', u'storedBytes': 709}, {u'firstEventTimestamp': 1418244027421, u'lastEventTimestamp': 1418244027541, u'creationTime': 1418244026907, u'uploadSequenceToken': u'49540113964795065449189116778452984186276757901477438642', u'logStreamName': u'4f44ffa128d6405591ca83b2b0f9dd2d', u'lastIngestionTime': 1418244028484, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:4f44ffa128d6405591ca83b2b0f9dd2d', u'storedBytes': 1010}, {u'firstEventTimestamp': 1418242565524, u'lastEventTimestamp': 1418242565641, u'creationTime': 1418242564196, u'uploadSequenceToken': u'49540113095132904942090446312687285178819573422397343074', u'logStreamName': u'69c5ac87e7e6415985116e8cb44e538e', u'lastIngestionTime': 1418242566558, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:69c5ac87e7e6415985116e8cb44e538e', u'storedBytes': 713}, {u'firstEventTimestamp': 1417213193378, u'lastEventTimestamp': 1417213193478, u'creationTime': 1417213192095, u'uploadSequenceToken': u'49540113336360065754596187770479764234792559857643841394', u'logStreamName': u'f68e3d87b8a14cdba338f6926f7cf50a', u'lastIngestionTime': 1417213194421, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:f68e3d87b8a14cdba338f6926f7cf50a', u'storedBytes': 711}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '2a6d4941-969b-11e4-947f-19d1c72ede7e'}}]
logs_get_log_events = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '2a7deb71-969b-11e4-914b-8f1f3d7b023d'}, u'nextForwardToken': u'f/31679748107442531967654742688057700554200447759088287749', u'events': [{u'ingestionTime': 1420569036909, u'timestamp': 1420569035842, u'message': u'2015-01-06T18:30:35.841Z\tko2sss03iq7l2pdk\tLoading event\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035899, u'message': u'START RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035940, u'message': u'2015-01-06T18:30:35.940Z\t23007242-95d2-11e4-a10e-7b2ab60a7770\t{\n "Records": [\n {\n "kinesis": {\n "partitionKey": "partitionKey-3",\n "kinesisSchemaVersion": "1.0",\n "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",\n "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"\n },\n "eventSource": "aws:kinesis",\n "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",\n "invokeIdentityArn": "arn:aws:iam::0123456789012:role/testLEBRole",\n "eventVersion": "1.0",\n "eventName": "aws:kinesis:record",\n "eventSourceARN": "arn:aws:kinesis:us-east-1:35667example:stream/examplestream",\n "awsRegion": "us-east-1"\n }\n ]\n}\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035940, u'message': u'2015-01-06T18:30:35.940Z\t23007242-95d2-11e4-a10e-7b2ab60a7770\tDecoded payload: Hello, this is a test 123.\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035941, u'message': u'END RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035941, u'message': u'REPORT RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\tDuration: 98.51 ms\tBilled Duration: 100 ms \tMemory Size: 128 MB\tMax Memory Used: 26 MB\t\n'}], u'nextBackwardToken': u'b/31679748105234758193000210997045664445208259969996226560'}]
cfn_describe_stacks = [
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7d66debd-96b8-11e4-a647-4f4741ffff69'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7e36fff7-96b8-11e4-af44-6350f4f8c2ae'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7ef03e10-96b8-11e4-bc86-7f67e11abcfa'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': None, u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_COMPLETE', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '8c2bff8e-96b8-11e4-be70-c5ad82c32f2d'}}]
cfn_create_stack = [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7c2f2260-96b8-11e4-be70-c5ad82c32f2d'}}]
cfn_delete_stack = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'f19af5b8-96bc-11e4-860e-11ba752b58a9'}}]
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import unittest
import mock
from kappa.log import Log
from tests.unit.mock_aws import get_aws
class TestLog(unittest.TestCase):
def setUp(self):
self.aws_patch = mock.patch('kappa.aws.get_aws', get_aws)
self.mock_aws = self.aws_patch.start()
def tearDown(self):
self.aws_patch.stop()
def test_streams(self):
mock_context = mock.Mock()
log = Log(mock_context, 'foo/bar')
streams = log.streams()
self.assertEqual(len(streams), 6)
def test_tail(self):
mock_context = mock.Mock()
log = Log(mock_context, 'foo/bar')
events = log.tail()
self.assertEqual(len(events), 6)
self.assertEqual(events[0]['ingestionTime'], 1420569036909)
self.assertIn('RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770',
events[-1]['message'])
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import unittest
import os
import mock
from kappa.stack import Stack
from tests.unit.mock_aws import get_aws
Config = {
'template': 'roles.cf',
'stack_name': 'FooBar',
'exec_role': 'ExecRole',
'invoke_role': 'InvokeRole'}
def path(filename):
return os.path.join(os.path.dirname(__file__), 'data', filename)
class TestStack(unittest.TestCase):
def setUp(self):
self.aws_patch = mock.patch('kappa.aws.get_aws', get_aws)
self.mock_aws = self.aws_patch.start()
Config['template'] = path(Config['template'])
def tearDown(self):
self.aws_patch.stop()
def test_properties(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
self.assertEqual(stack.name, Config['stack_name'])
self.assertEqual(stack.template_path, Config['template'])
self.assertEqual(stack.exec_role, Config['exec_role'])
self.assertEqual(stack.invoke_role, Config['invoke_role'])
self.assertEqual(
stack.invoke_role_arn,
'arn:aws:iam::0123456789012:role/TestKinesis-InvokeRole-FOO')
def test_exists(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
self.assertTrue(stack.exists())
def test_update(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
stack.update()
def test_delete(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
stack.delete()