Mitch Garnaat

Merge pull request #14 from garnaat/update-for-ga

Update for ga
......@@ -12,37 +12,52 @@ There are quite a few steps involved in developing a Lambda function.
You have to:
* Write the function itself (Javascript only for now)
* Create the IAM roles required by the Lambda function itself (the executing
role) as well as the policy required by whoever is invoking the Lambda
function (the invocation role)
* Create the IAM role required by the Lambda function itself (the executing
role) to allow it access to any resources it needs to do its job
* Add additional permissions to the Lambda function if it is going to be used
in a Push model (e.g. S3, SNS) rather than a Pull model.
* Zip the function and any dependencies and upload it to AWS Lambda
* Test the function with mock data
* Retrieve the output of the function from CloudWatch Logs
* Add an event source to the function
* View the output of the live function
Kappa tries to help you with some of this. The IAM roles are created
in a CloudFormation template and kappa takes care of creating, updating, and
deleting the CloudFormation stack. Kappa will also zip up the function and
Kappa tries to help you with some of this. It allows you to create an IAM
managed policy or use an existing one. It creates the IAM execution role for
you and associates the policy with it. Kappa will zip up the function and
any dependencies and upload them to AWS Lambda. It also sends test data
to the uploaded function and finds the related CloudWatch log stream and
displays the log events. Finally, it will add the event source to turn
your function on.
If you need to make changes, kappa will allow you to easily update your Lambda
function with new code or update your event sources as needed.
Getting Started
---------------
Kappa is a command line tool. The basic command format is:
kappa <path to config file> <command> [optional command args]
Where ``command`` is one of:
* deploy - deploy the CloudFormation template containing the IAM roles and zip
the function and upload it to AWS Lambda
* test - send test data to the new Lambda function
* create - creates the IAM policy (if necessary), the IAM role, and zips and
uploads the Lambda function code to the Lambda service
* invoke - make a synchronous call to your Lambda function, passing test data
and display the resulting log data
* invoke_async - make an asynchronous call to your Lambda function passing test
data.
* dryrun - make the call but only check things like permissions and report
back. Don't actually run the code.
* tail - display the most recent log events for the function (remember that it
can take several minutes before log events are available from CloudWatch)
* add-event-sources - hook up an event source to your Lambda function
* delete - delete the CloudFormation stack containing the IAM roles and delete
the Lambda function
* delete - delete the Lambda function, remove any event sources, delete the IAM
policy and role
* update_code - Upload new code for your Lambda function
* update_event_sources - Update the event sources based on the information in
your kappa config file
* status - display summary information about functions, stacks, and event
sources related to your project.
......@@ -58,14 +73,12 @@ An example project based on a Kinesis stream can be found in
The basic workflow is:
* Create your Lambda function
* Create your CloudFormation template with the execution and invocation roles
* Create any custom IAM policy you need to execute your Lambda function
* Create some sample data
* Create the YAML config file with all of the information
* Run ``kappa <path-to-config> deploy`` to create roles and upload function
* Run ``kappa <path-to-config> test`` to invoke the function with test data
* Run ``kappa <path-to-config> tail`` to view the functions output in CloudWatch logs
* Run ``kappa <path-to-config> create`` to create roles and upload function
* Run ``kappa <path-to-config> invoke`` to invoke the function with test data
* Run ``kappa <path-to-config> update_code`` to upload new code for your Lambda
function
* Run ``kappa <path-to-config> add-event-source`` to hook your function up to the event source
* Run ``kappa <path-to-config> tail`` to see more output
If you have to make changes in your function or in your IAM roles, simply run
``kappa deploy`` again and the changes will be uploaded as necessary.
......
......@@ -13,6 +13,7 @@
# language governing permissions and limitations under the License.
from datetime import datetime
import logging
import base64
import click
......@@ -38,18 +39,46 @@ def cli(ctx, config=None, debug=False):
@cli.command()
@click.pass_context
def deploy(ctx):
def create(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('deploying...')
context.deploy()
click.echo('creating...')
context.create()
click.echo('...done')
@cli.command()
@click.pass_context
def test(ctx):
def update_code(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('testing...')
context.test()
click.echo('updating code...')
context.update_code()
click.echo('...done')
@cli.command()
@click.pass_context
def invoke(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('invoking...')
response = context.invoke()
log_data = base64.b64decode(response['LogResult'])
click.echo(log_data)
click.echo('...done')
@cli.command()
@click.pass_context
def dryrun(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('invoking dryrun...')
response = context.dryrun()
click.echo(response)
click.echo('...done')
@cli.command()
@click.pass_context
def invoke_async(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('invoking async...')
response = context.invoke_async()
click.echo(response)
click.echo('...done')
@cli.command()
......@@ -67,33 +96,35 @@ def tail(ctx):
def status(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
status = context.status()
click.echo(click.style('Stack', bold=True))
if status['stack']:
for stack in status['stack']['Stacks']:
line = ' {}: {}'.format(stack['StackId'], stack['StackStatus'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Policy', bold=True))
if status['policy']:
line = ' {} ({})'.format(
status['policy']['PolicyName'],
status['policy']['Arn'])
click.echo(click.style(line, fg='green'))
click.echo(click.style('Role', bold=True))
if status['role']:
line = ' {} ({})'.format(
status['role']['Role']['RoleName'],
status['role']['Role']['Arn'])
click.echo(click.style(line, fg='green'))
click.echo(click.style('Function', bold=True))
if status['function']:
line = ' {}'.format(
status['function']['Configuration']['FunctionName'])
line = ' {} ({})'.format(
status['function']['Configuration']['FunctionName'],
status['function']['Configuration']['FunctionArn'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Event Sources', bold=True))
if status['event_sources']:
for event_source in status['event_sources']:
if 'EventSource' in event_source:
if event_source:
line = ' {}: {}'.format(
event_source['EventSource'], event_source['IsActive'])
event_source['EventSourceArn'], event_source['State'])
click.echo(click.style(line, fg='green'))
else:
line = ' {}'.format(
event_source['CloudFunctionConfiguration']['Id'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style(' None', fg='green'))
@cli.command()
@click.pass_context
......@@ -111,6 +142,14 @@ def add_event_sources(ctx):
context.add_event_sources()
click.echo('...done')
@cli.command()
@click.pass_context
def update_event_sources(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('updating event sources...')
context.update_event_sources()
click.echo('...done')
if __name__ == '__main__':
cli(obj={})
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014,2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
......@@ -11,21 +11,20 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import botocore.session
import boto3
class __AWS(object):
def __init__(self, profile=None, region=None):
def __init__(self, profile_name=None, region_name=None):
self._client_cache = {}
self._session = botocore.session.get_session()
self._session.profile = profile
self._region = region
self._session = boto3.session.Session(
region_name=region_name, profile_name=profile_name)
def create_client(self, client_name):
if client_name not in self._client_cache:
self._client_cache[client_name] = self._session.create_client(
client_name, self._region)
self._client_cache[client_name] = self._session.client(
client_name)
return self._client_cache[client_name]
......
......@@ -13,10 +13,12 @@
import logging
import yaml
import time
import kappa.function
import kappa.event_source
import kappa.stack
import kappa.policy
import kappa.role
LOG = logging.getLogger(__name__)
......@@ -32,8 +34,16 @@ class Context(object):
else:
self.set_logger('kappa', logging.INFO)
self.config = yaml.load(config_file)
self._stack = kappa.stack.Stack(
self, self.config['cloudformation'])
if 'policy' in self.config.get('iam', ''):
self.policy = kappa.policy.Policy(
self, self.config['iam']['policy'])
else:
self.policy = None
if 'role' in self.config.get('iam', ''):
self.role = kappa.role.Role(
self, self.config['iam']['role'])
else:
self.role = None
self.function = kappa.function.Function(
self, self.config['lambda'])
self.event_sources = []
......@@ -57,11 +67,7 @@ class Context(object):
@property
def exec_role_arn(self):
return self._stack.exec_role_arn
@property
def invoke_role_arn(self):
return self._stack.invoke_role_arn
return self.role.arn
def debug(self):
self.set_logger('kappa', logging.DEBUG)
......@@ -90,44 +96,88 @@ class Context(object):
log.addHandler(ch)
def _create_event_sources(self):
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
if 'event_sources' in self.config['lambda']:
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
else:
msg = 'Unsupported event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
elif svc == 'sns':
self.event_sources.append(
kappa.event_source.SNSEventSource(
self, event_source_cfg))
elif svc == 'dynamodb':
self.event_sources.append(
kappa.event_source.DynamoDBStreamEventSource(
self, event_source_cfg))
else:
msg = 'Unknown event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
def add_event_sources(self):
for event_source in self.event_sources:
event_source.add(self.function)
def deploy(self):
self._stack.update()
self.function.upload()
def update_event_sources(self):
for event_source in self.event_sources:
event_source.update(self.function)
def create(self):
if self.policy:
self.policy.create()
if self.role:
self.role.create()
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
time.sleep(5)
self.function.create()
def update_code(self):
self.function.update()
def test(self):
self.function.test()
def invoke(self):
return self.function.invoke()
def dryrun(self):
return self.function.dryrun()
def invoke_async(self):
return self.function.invoke_async()
def tail(self):
return self.function.tail()
def delete(self):
self._stack.delete()
self.function.delete()
for event_source in self.event_sources:
event_source.remove(self.function)
self.function.delete()
time.sleep(5)
if self.role:
self.role.delete()
time.sleep(5)
if self.policy:
self.policy.delete()
def status(self):
status = {}
status['stack'] = self._stack.status()
if self.policy:
status['policy'] = self.policy.status()
else:
status['policy'] = None
if self.role:
status['role'] = self.role.status()
else:
status['role'] = None
status['function'] = self.function.status()
status['event_sources'] = []
for event_source in self.event_sources:
status['event_sources'].append(event_source.status(self.function))
if self.event_sources:
for event_source in self.event_sources:
status['event_sources'].append(
event_source.status(self.function))
return status
......
......@@ -31,9 +31,17 @@ class EventSource(object):
return self._config['arn']
@property
def starting_position(self):
return self._config.get('starting_position', 'TRIM_HORIZON')
@property
def batch_size(self):
return self._config.get('batch_size', 100)
@property
def enabled(self):
return self._config.get('enabled', True)
class KinesisEventSource(EventSource):
......@@ -44,46 +52,71 @@ class KinesisEventSource(EventSource):
def _get_uuid(self, function):
uuid = None
response = self._lambda.list_event_sources(
response = self._lambda.list_event_source_mappings(
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSources']) > 0:
uuid = response['EventSources'][0]['UUID']
if len(response['EventSourceMappings']) > 0:
uuid = response['EventSourceMappings'][0]['UUID']
return uuid
def add(self, function):
try:
response = self._lambda.add_event_source(
response = self._lambda.create_event_source_mapping(
FunctionName=function.name,
Role=self._context.invoke_role_arn,
EventSource=self.arn,
BatchSize=self.batch_size)
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position,
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add Kinesis event source')
LOG.exception('Unable to add event source')
def update(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.update_event_source_mapping(
BatchSize=self.batch_size,
Enabled=self.enabled,
FunctionName=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update event source')
def remove(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.remove_event_source(
response = self._lambda.delete_event_source_mapping(
UUID=uuid)
LOG.debug(response)
return response
def status(self, function):
response = None
LOG.debug('getting status for event source %s', self.arn)
try:
response = self._lambda.get_event_source(
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.get_event_source_mapping(
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
else:
LOG.debug('No UUID for event source %s', self.arn)
return response
class DynamoDBStreamEventSource(KinesisEventSource):
pass
class S3EventSource(EventSource):
def __init__(self, context, config):
......@@ -134,3 +167,50 @@ class S3EventSource(EventSource):
if 'CloudFunctionConfiguration' not in response:
response = None
return response
class SNSEventSource(EventSource):
def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._sns = aws.create_client('sns')
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def exists(self, function):
try:
response = self._sns.list_subscriptions_by_topic(
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
if subscription['Endpoint'] == function.arn:
return subscription
return None
except Exception:
LOG.exception('Unable to find event source %s', self.arn)
def add(self, function):
try:
response = self._sns.subscribe(
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add SNS event source')
def remove(self, function):
LOG.debug('removing SNS event source')
try:
subscription = self.exists(function)
if subscription:
response = self._sns.unsubscribe(
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove event source %s', self.arn)
def status(self, function):
LOG.debug('status for SNS notification for %s', function.name)
return self.exist(function)
......
......@@ -46,10 +46,6 @@ class Function(object):
return self._config['handler']
@property
def mode(self):
return self._config['mode']
@property
def description(self):
return self._config['description']
......@@ -74,13 +70,17 @@ class Function(object):
return self._config['test_data']
@property
def permissions(self):
return self._config.get('permissions', list())
@property
def arn(self):
if self._arn is None:
try:
response = self._lambda_svc.get_function_configuration(
response = self._lambda_svc.get_function(
FunctionName=self.name)
LOG.debug(response)
self._arn = response['FunctionARN']
self._arn = response['Configuration']['FunctionArn']
except Exception:
LOG.debug('Unable to find ARN for function: %s', self.name)
return self._arn
......@@ -124,30 +124,68 @@ class Function(object):
else:
self._zip_lambda_file(zipfile_name, lambda_fn)
def upload(self):
LOG.debug('uploading %s', self.zipfile_name)
def add_permissions(self):
for permission in self.permissions:
try:
kwargs = {
'FunctionName': self.name,
'StatementId': permission['statement_id'],
'Action': permission['action'],
'Principal': permission['principal']}
source_arn = permission.get('source_arn', None)
if source_arn:
kwargs['SourceArn'] = source_arn
source_account = permission.get('source_account', None)
if source_account:
kwargs['SourceAccount'] = source_account
response = self._lambda_svc.add_permission(**kwargs)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add permission')
def create(self):
LOG.debug('creating %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
exec_role = self._context.exec_role_arn
LOG.debug('exec_role=%s', exec_role)
try:
response = self._lambda_svc.upload_function(
zipdata = fp.read()
response = self._lambda_svc.create_function(
FunctionName=self.name,
FunctionZip=fp,
Code={'ZipFile': zipdata},
Runtime=self.runtime,
Role=exec_role,
Handler=self.handler,
Mode=self.mode,
Description=self.description,
Timeout=self.timeout,
MemorySize=self.memory_size)
LOG.debug(response)
except Exception:
LOG.exception('Unable to upload zip file')
self.add_permissions()
def update(self):
LOG.debug('updating %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
try:
zipdata = fp.read()
response = self._lambda_svc.update_function_code(
FunctionName=self.name,
ZipFile=zipdata)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update zip file')
def delete(self):
LOG.debug('deleting function %s', self.name)
response = self._lambda_svc.delete_function(FunctionName=self.name)
LOG.debug(response)
response = None
try:
response = self._lambda_svc.delete_function(FunctionName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('function %s: not found', self.name)
return response
def status(self):
......@@ -169,5 +207,24 @@ class Function(object):
InvokeArgs=fp)
LOG.debug(response)
def test(self):
self.invoke_asynch(self.test_data)
def _invoke(self, test_data, invocation_type):
if test_data is None:
test_data = self.test_data
LOG.debug('invoke %s', test_data)
with open(test_data) as fp:
response = self._lambda_svc.invoke(
FunctionName=self.name,
InvocationType=invocation_type,
LogType='Tail',
Payload=fp.read())
LOG.debug(response)
return response
def invoke(self, test_data=None):
return self._invoke(test_data, 'RequestResponse')
def invoke_async(self, test_data=None):
return self._invoke(test_data, 'Event')
def dryrun(self, test_data=None):
return self._invoke(test_data, 'DryRun')
......
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import kappa.aws
LOG = logging.getLogger(__name__)
class Policy(object):
Path = '/kappa/'
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._arn = None
@property
def name(self):
return self._config['name']
@property
def description(self):
return self._config.get('description', None)
@property
def document(self):
return self._config['document']
@property
def arn(self):
if self._arn is None:
policy = self.exists()
if policy:
self._arn = policy.get('Arn', None)
return self._arn
def exists(self):
try:
response = self._iam_svc.list_policies(PathPrefix=self.Path)
LOG.debug(response)
for policy in response['Policies']:
if policy['PolicyName'] == self.name:
return policy
except Exception:
LOG.exception('Error listing policies')
return None
def create(self):
LOG.debug('creating policy %s', self.name)
policy = self.exists()
if not policy:
with open(self.document, 'rb') as fp:
try:
response = self._iam_svc.create_policy(
Path=self.Path, PolicyName=self.name,
PolicyDocument=fp.read(),
Description=self.description)
LOG.debug(response)
except Exception:
LOG.exception('Error creating Policy')
def delete(self):
response = None
if self.arn:
LOG.debug('deleting policy %s', self.name)
response = self._iam_svc.delete_policy(PolicyArn=self.arn)
LOG.debug(response)
return response
def status(self):
LOG.debug('getting status for policy %s', self.name)
return self.exists()
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
LOG = logging.getLogger(__name__)
AssumeRolePolicyDocument = """{
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
} ]
}"""
class Role(object):
Path = '/kappa/'
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._arn = None
@property
def name(self):
return self._config['name']
@property
def arn(self):
if self._arn is None:
try:
response = self._iam_svc.get_role(
RoleName=self.name)
LOG.debug(response)
self._arn = response['Role']['Arn']
except Exception:
LOG.debug('Unable to find ARN for role: %s', self.name)
return self._arn
def exists(self):
try:
response = self._iam_svc.list_roles(PathPrefix=self.Path)
LOG.debug(response)
for role in response['Roles']:
if role['RoleName'] == self.name:
return role
except Exception:
LOG.exception('Error listing roles')
return None
def create(self):
LOG.debug('creating role %s', self.name)
role = self.exists()
if not role:
try:
response = self._iam_svc.create_role(
Path=self.Path, RoleName=self.name,
AssumeRolePolicyDocument=AssumeRolePolicyDocument)
LOG.debug(response)
if self._context.policy:
LOG.debug('attaching policy %s', self._context.policy.arn)
response = self._iam_svc.attach_role_policy(
RoleName=self.name,
PolicyArn=self._context.policy.arn)
LOG.debug(response)
except ClientError:
LOG.exception('Error creating Role')
def delete(self):
response = None
LOG.debug('deleting role %s', self.name)
try:
LOG.debug('First detach the policy from the role')
policy_arn = self._context.policy.arn
if policy_arn:
response = self._iam_svc.detach_role_policy(
RoleName=self.name, PolicyArn=policy_arn)
LOG.debug(response)
response = self._iam_svc.delete_role(RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.exception('role %s not found', self.name)
return response
def status(self):
LOG.debug('getting status for role %s', self.name)
try:
response = self._iam_svc.get_role(RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('role %s not found', self.name)
response = None
return response
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import time
import kappa.aws
LOG = logging.getLogger(__name__)
class Stack(object):
completed_states = ('CREATE_COMPLETE', 'UPDATE_COMPLETE')
failed_states = ('UPDATE_ROLLBACK_COMPLETE', 'ROLLBACK_COMPLETE')
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(self._context)
self._cfn = aws.create_client('cloudformation')
self._iam = aws.create_client('iam')
@property
def name(self):
return self._config['stack_name']
@property
def template_path(self):
return self._config['template']
@property
def exec_role(self):
return self._config['exec_role']
@property
def exec_role_arn(self):
return self._get_role_arn(self.exec_role)
@property
def invoke_role(self):
return self._config['invoke_role']
@property
def invoke_role_arn(self):
return self._get_role_arn(self.invoke_role)
def _get_role_arn(self, role_name):
role_arn = None
try:
resources = self._cfn.list_stack_resources(
StackName=self.name)
LOG.debug(resources)
except Exception:
LOG.exception('Unable to find role ARN: %s', role_name)
for resource in resources['StackResourceSummaries']:
if resource['LogicalResourceId'] == role_name:
role = self._iam.get_role(
RoleName=resource['PhysicalResourceId'])
LOG.debug(role)
role_arn = role['Role']['Arn']
LOG.debug('role_arn: %s', role_arn)
return role_arn
def exists(self):
"""
Does Cloudformation Stack already exist?
"""
try:
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug('Stack %s exists', self.name)
except Exception:
LOG.debug('Stack %s does not exist', self.name)
response = None
return response
def wait(self):
done = False
while not done:
time.sleep(1)
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug(response)
status = response['Stacks'][0]['StackStatus']
LOG.debug('Stack status is: %s', status)
if status in self.completed_states:
done = True
if status in self.failed_states:
msg = 'Could not create stack %s: %s' % (self.name, status)
raise ValueError(msg)
def _create(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.create_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to create stack')
self.wait()
def _update(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.update_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception as e:
if 'ValidationError' in str(e):
LOG.info('No Updates Required')
else:
LOG.exception('Unable to update stack')
self.wait()
def update(self):
if self.exists():
self._update()
else:
self._create()
def status(self):
return self.exists()
def delete(self):
LOG.debug('delete_stack: stack_name=%s', self.name)
try:
response = self._cfn.delete_stack(StackName=self.name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to delete stack: %s', self.name)
botocore==0.94.0
click==3.3
boto3==0.0.16
click==4.0
PyYAML>=3.11
mock>=1.0.1
nose==1.3.1
......
console.log('Loading event');
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, ' '));
for(i = 0; i < event.Records.length; ++i) {
encodedPayload = event.Records[i].kinesis.data;
payload = new Buffer(encodedPayload, 'base64').toString('ascii');
console.log("Decoded payload: " + payload);
}
context.done(null, "Hello World"); // SUCCESS with message
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
......
---
profile: personal
region: us-east-1
cloudformation:
template: roles.cf
stack_name: TestKinesis
exec_role: ExecRole
invoke_role: InvokeRole
iam:
role_name: KinesisSampleRole
role_policy: AWSLambdaKinesisExecutionRole
lambda:
name: KinesisSample
zipfile_name: KinesisSample.zip
......@@ -15,9 +13,10 @@ lambda:
runtime: nodejs
memory_size: 128
timeout: 3
mode: event
event_sources:
-
arn: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
starting_position: TRIM_HORIZON
batch_size: 100
test_data: input.json
\ No newline at end of file
......
......@@ -12,8 +12,8 @@
"invokeIdentityArn": "arn:aws:iam::059493405231:role/testLEBRole",
"eventVersion": "1.0",
"eventName": "aws:kinesis:record",
"eventSourceARN": "arn:aws:kinesis:us-east-1:35667example:stream/examplestream",
"awsRegion": "us-east-1"
"eventSourceARN": "arn:aws:kinesis:us-west-2:35667example:stream/examplestream",
"awsRegion": "us-west-2"
}
]
}
......
{
"Version": "2012-10-17",
"Statement":[
{
"Sid":"Stmt1428510662000",
"Effect":"Allow",
"Action":["dynamodb:*"],
"Resource":["arn:aws:dynamodb:us-east-1:084307701560:table/snslambda"]
}
]
}
---
profile: personal
region: us-east-1
resources: resources.json
iam:
policy:
description: A policy used with the Kappa SNS->DynamoDB example
name: LambdaSNSSamplePolicy
document: LambdaSNSSamplePolicy.json
role:
name: SNSSampleRole
policy: LambdaSNSSamplePolicy
lambda:
name: SNSSample
zipfile_name: SNSSample.zip
description: Testing SNS -> DynamoDB Lambda handler
path: messageStore.js
handler: messageStore.handler
runtime: nodejs
memory_size: 128
timeout: 3
permissions:
-
statement_id: sns_invoke
action: lambda:invokeFunction
principal: sns.amazonaws.com
source_arn: arn:aws:sns:us-east-1:084307701560:lambda_topic
event_sources:
-
arn: arn:aws:sns:us-east-1:084307701560:lambda_topic
test_data: input.json
\ No newline at end of file
{
"TableName": "snslambda",
"AttributeDefinitions": [
{
"AttributeName": "SnsTopicArn",
"AttributeType": "S"
},
{
"AttributeName": "SnsPublishTime",
"AttributeType": "S"
},
{
"AttributeName": "SnsMessageId",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "SnsTopicArn",
"KeyType": "HASH"
},
{
"AttributeName": "SnsPublishTime",
"KeyType": "RANGE"
}
],
"GlobalSecondaryIndexes": [
{
"IndexName": "MesssageIndex",
"KeySchema": [
{
"AttributeName": "SnsMessageId",
"KeyType": "HASH"
}
],
"Projection": {
"ProjectionType": "ALL"
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 1
}
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
console.log('Loading event');
var aws = require('aws-sdk');
var ddb = new aws.DynamoDB({params: {TableName: 'snslambda'}});
exports.handler = function(event, context) {
var SnsMessageId = event.Records[0].Sns.MessageId;
var SnsPublishTime = event.Records[0].Sns.Timestamp;
var SnsTopicArn = event.Records[0].Sns.TopicArn;
var LambdaReceiveTime = new Date().toString();
var itemParams = {Item: {SnsTopicArn: {S: SnsTopicArn},
SnsPublishTime: {S: SnsPublishTime}, SnsMessageId: {S: SnsMessageId},
LambdaReceiveTime: {S: LambdaReceiveTime} }};
ddb.putItem(itemParams, function() {
context.done(null,'');
});
};
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Creates the DynamoDB Table needed for the example",
"Resources" : {
"snslambda" : {
"Type" : "AWS::DynamoDB::Table",
"Properties" : {
"AttributeDefinitions": [
{
"AttributeName" : "SnsTopicArn",
"AttributeType" : "S"
},
{
"AttributeName" : "SnsPublishTime",
"AttributeType" : "S"
}
],
"KeySchema": [
{ "AttributeName": "SnsTopicArn", "KeyType": "HASH" },
{ "AttributeName": "SnsPublishTime", "KeyType": "RANGE" }
],
"ProvisionedThroughput" : {
"ReadCapacityUnits" : 5,
"WriteCapacityUnits" : 5
}
}
}
},
"Outputs" : {
"TableName" : {
"Value" : {"Ref" : "snslambda"},
"Description" : "Table name of the newly created DynamoDB table"
}
}
}
......@@ -5,8 +5,8 @@ from setuptools import setup, find_packages
import os
requires = [
'botocore==0.94.0',
'click==3.3',
'boto3==0.0.16',
'click==4.0',
'PyYAML>=3.11'
]
......
{
"Statement":[
{"Condition":
{"ArnLike":{"AWS:SourceArn":"arn:aws:sns:us-east-1:123456789012:lambda_topic"}},
"Resource":"arn:aws:lambda:us-east-1:123456789023:function:messageStore",
"Action":"lambda:invokeFunction",
"Principal":{"Service":"sns.amazonaws.com"},
"Sid":"sns invoke","Effect":"Allow"
}],
"Id":"default",
"Version":"2012-10-17"
}
import inspect
import mock
import tests.unit.responses as responses
......@@ -6,40 +8,23 @@ import tests.unit.responses as responses
class MockAWS(object):
def __init__(self, profile=None, region=None):
pass
self.response_map = {}
for name, value in inspect.getmembers(responses):
if name.startswith('__'):
continue
if '_' in name:
service_name, request_name = name.split('_', 1)
if service_name not in self.response_map:
self.response_map[service_name] = {}
self.response_map[service_name][request_name] = value
def create_client(self, client_name):
client = None
if client_name == 'logs':
client = mock.Mock()
choices = responses.logs_describe_log_groups
client.describe_log_groups = mock.Mock(
side_effect=choices)
choices = responses.logs_describe_log_streams
client.describe_log_streams = mock.Mock(
side_effect=choices)
choices = responses.logs_get_log_events
client.get_log_events = mock.Mock(
side_effect=choices)
if client_name == 'cloudformation':
client = mock.Mock()
choices = responses.cfn_list_stack_resources
client.list_stack_resources = mock.Mock(
side_effect=choices)
choices = responses.cfn_describe_stacks
client.describe_stacks = mock.Mock(
side_effect=choices)
choices = responses.cfn_create_stack
client.create_stack = mock.Mock(
side_effect=choices)
choices = responses.cfn_delete_stack
client.delete_stack = mock.Mock(
side_effect=choices)
if client_name == 'iam':
if client_name in self.response_map:
client = mock.Mock()
choices = responses.iam_get_role
client.get_role = mock.Mock(
side_effect=choices)
for request in self.response_map[client_name]:
response = self.response_map[client_name][request]
setattr(client, request, mock.Mock(side_effect=response))
return client
......
import datetime
from dateutil.tz import tzutc
cfn_list_stack_resources = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'dd35f0ef-9699-11e4-ba38-c355c9515dbc'}, u'StackResourceSummaries': [{u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Role', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 54, 861000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestKinesis-InvokeRole-IF6VUXY9MBJN', u'LogicalResourceId': 'InvokeRole'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Role', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 55, 18000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestKinesis-ExecRole-567SAV6TZOET', u'LogicalResourceId': 'ExecRole'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Policy', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 58, 120000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestK-Invo-OMW5SDLQM8FM', u'LogicalResourceId': 'InvokeRolePolicies'}, {u'ResourceStatus': 'CREATE_COMPLETE', u'ResourceType': 'AWS::IAM::Policy', u'ResourceStatusReason': None, u'LastUpdatedTimestamp': datetime.datetime(2015, 1, 6, 17, 37, 58, 454000, tzinfo=tzutc()), u'PhysicalResourceId': 'TestK-Exec-APWRVKTBPPPT', u'LogicalResourceId': 'ExecRolePolicies'}]}]
iam_list_policies = [{u'IsTruncated': True,
u'Marker': 'ABcyoYmSlphARcitCJruhVIxKW3Hg1LJD3Fm4LAW8iGKykrSNrApiUoz2rjIuNiLJpT6JtUgP5M7wTuPZcHu1KsvMarvgFBFQObTPSa4WF22Zg==',
u'Policies': [{u'Arn': 'arn:aws:iam::123456789012:policy/FooPolicy',
u'AttachmentCount': 0,
u'CreateDate': datetime.datetime(2015, 2, 24, 3, 16, 24, tzinfo=tzutc()),
u'DefaultVersionId': 'v2',
u'IsAttachable': True,
u'Path': '/',
u'PolicyId': 'ANPAJHWE6R7YT7PLAH3KG',
u'PolicyName': 'FooPolicy',
u'UpdateDate': datetime.datetime(2015, 2, 25, 0, 19, 12, tzinfo=tzutc())},
{u'Arn': 'arn:aws:iam::123456789012:policy/BarPolicy',
u'AttachmentCount': 1,
u'CreateDate': datetime.datetime(2015, 2, 25, 0, 11, 57, tzinfo=tzutc()),
u'DefaultVersionId': 'v2',
u'IsAttachable': True,
u'Path': '/',
u'PolicyId': 'ANPAJU7MVBQXOQTVQN3VM',
u'PolicyName': 'BarPolicy',
u'UpdateDate': datetime.datetime(2015, 2, 25, 0, 13, 8, tzinfo=tzutc())},
{u'Arn': 'arn:aws:iam::123456789012:policy/FiePolicy',
u'AttachmentCount': 1,
u'CreateDate': datetime.datetime(2015, 3, 21, 19, 18, 21, tzinfo=tzutc()),
u'DefaultVersionId': 'v4',
u'IsAttachable': True,
u'Path': '/',
u'PolicyId': 'ANPAIXQ72B2OH2RZPYQ4Y',
u'PolicyName': 'FiePolicy',
u'UpdateDate': datetime.datetime(2015, 3, 26, 23, 26, 52, tzinfo=tzutc())}],
'ResponseMetadata': {'HTTPStatusCode': 200,
'RequestId': '4e87c995-ecf2-11e4-bb10-51f1499b3162'}}]
iam_create_policy = [{u'Policy': {u'PolicyName': 'LambdaChatDynamoDBPolicy', u'CreateDate': datetime.datetime(2015, 4, 27, 12, 13, 35, 240000, tzinfo=tzutc()), u'AttachmentCount': 0, u'IsAttachable': True, u'PolicyId': 'ANPAISQNU4EPZZDVZUOKU', u'DefaultVersionId': 'v1', u'Path': '/kappa/', u'Arn': 'arn:aws:iam::658794617753:policy/kappa/LambdaChatDynamoDBPolicy', u'UpdateDate': datetime.datetime(2015, 4, 27, 12, 13, 35, 240000, tzinfo=tzutc())}, 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'd403e95f-ecd6-11e4-9ee0-15e8b71db930'}}]
iam_list_roles = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'd41415ff-ecd6-11e4-bb10-51f1499b3162'}, u'IsTruncated': False, u'Roles': [{u'AssumeRolePolicyDocument': {u'Version': u'2012-10-17', u'Statement': [{u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u'lambda.amazonaws.com'}, u'Effect': u'Allow', u'Sid': u''}]}, u'RoleId': 'AROAJ4JSNL3M4UYI6GDYS', u'CreateDate': datetime.datetime(2015, 4, 27, 11, 59, 19, tzinfo=tzutc()), u'RoleName': 'FooRole', u'Path': '/kappa/', u'Arn': 'arn:aws:iam::123456789012:role/kappa/FooRole'}]}]
iam_create_role = [{u'Role': {u'AssumeRolePolicyDocument': {u'Version': u'2012-10-17', u'Statement': [{u'Action': [u'sts:AssumeRole'], u'Effect': u'Allow', u'Principal': {u'Service': [u'lambda.amazonaws.com']}}]}, u'RoleId': 'AROAIT2ZRRPQBOIBBHPZU', u'CreateDate': datetime.datetime(2015, 4, 27, 12, 13, 35, 426000, tzinfo=tzutc()), u'RoleName': 'BazRole', u'Path': '/kappa/', u'Arn': 'arn:aws:iam::123456789012:role/kappa/BazRole'}, 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'd41fd55c-ecd6-11e4-9fd8-03ee0021e940'}}]
iam_get_role = [{u'Role': {u'AssumeRolePolicyDocument': {u'Version': u'2012-10-17', u'Statement': [{u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u's3.amazonaws.com'}, u'Effect': u'Allow', u'Condition': {u'ArnLike': {u'sts:ExternalId': u'arn:aws:s3:::*'}}, u'Sid': u''}, {u'Action': u'sts:AssumeRole', u'Principal': {u'Service': u'lambda.amazonaws.com'}, u'Effect': u'Allow', u'Sid': u''}]}, u'RoleId': 'AROAIEVJHUJG2I4MG5PSC', u'CreateDate': datetime.datetime(2015, 1, 6, 17, 37, 44, tzinfo=tzutc()), u'RoleName': 'TestKinesis-InvokeRole-IF6VUXY9MBJN', u'Path': '/', u'Arn': 'arn:aws:iam::0123456789012:role/TestKinesis-InvokeRole-FOO'}, 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'dd6e8d42-9699-11e4-afe6-d3625e8b365b'}}]
iam_attach_role_policy = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'd43e32dc-ecd6-11e4-9fd8-03ee0021e940'}}]
iam_detach_role_policy = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'a7d30b51-ecd6-11e4-bbe4-d996b8ad5d9e'}}]
iam_delete_role = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'a7e5a97e-ecd6-11e4-ae9e-6dee7bf37e66'}}]
lambda_create_function = [{u'FunctionName': u'LambdaChatDynamoDB', 'ResponseMetadata': {'HTTPStatusCode': 201, 'RequestId': 'd7840efb-ecd6-11e4-b8b0-f7f3177894e9'}, u'CodeSize': 22024, u'MemorySize': 128, u'FunctionArn': u'arn:aws:lambda:us-east-1:123456789012:function:FooBarFunction', u'Handler': u'FooBarFunction.handler', u'Role': u'arn:aws:iam::123456789012:role/kappa/BazRole', u'Timeout': 3, u'LastModified': u'2015-04-27T12:13:41.147+0000', u'Runtime': u'nodejs', u'Description': u'A FooBar function'}]
lambda_delete_function = [{'ResponseMetadata': {'HTTPStatusCode': 204, 'RequestId': 'a499b2c2-ecd6-11e4-8d2a-77b7e55836e7'}}]
logs_describe_log_groups = [{'ResponseMetadata': {'HTTPStatusCode': 200,
'RequestId': 'da962431-afed-11e4-8c17-1776597471e6'},
u'logGroups': [{u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample*',
......@@ -23,13 +69,3 @@ logs_describe_log_groups = [{'ResponseMetadata': {'HTTPStatusCode': 200,
logs_describe_log_streams = [{u'logStreams': [{u'firstEventTimestamp': 1417042749449, u'lastEventTimestamp': 1417042749547, u'creationTime': 1417042748263, u'uploadSequenceToken': u'49540114640150833041490484409222729829873988799393975922', u'logStreamName': u'1cc48e4e613246b7974094323259d600', u'lastIngestionTime': 1417042750483, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:1cc48e4e613246b7974094323259d600', u'storedBytes': 712}, {u'firstEventTimestamp': 1417272406988, u'lastEventTimestamp': 1417272407088, u'creationTime': 1417272405690, u'uploadSequenceToken': u'49540113907504451034164105858363493278561872472363261986', u'logStreamName': u'2782a5ff88824c85a9639480d1ed7bbe', u'lastIngestionTime': 1417272408043, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:2782a5ff88824c85a9639480d1ed7bbe', u'storedBytes': 712}, {u'firstEventTimestamp': 1420569035842, u'lastEventTimestamp': 1420569035941, u'creationTime': 1420569034614, u'uploadSequenceToken': u'49540113907883563702539166025438885323514410026454245426', u'logStreamName': u'2d62991a479b4ebf9486176122b72a55', u'lastIngestionTime': 1420569036909, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:2d62991a479b4ebf9486176122b72a55', u'storedBytes': 709}, {u'firstEventTimestamp': 1418244027421, u'lastEventTimestamp': 1418244027541, u'creationTime': 1418244026907, u'uploadSequenceToken': u'49540113964795065449189116778452984186276757901477438642', u'logStreamName': u'4f44ffa128d6405591ca83b2b0f9dd2d', u'lastIngestionTime': 1418244028484, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:4f44ffa128d6405591ca83b2b0f9dd2d', u'storedBytes': 1010}, {u'firstEventTimestamp': 1418242565524, u'lastEventTimestamp': 1418242565641, u'creationTime': 1418242564196, u'uploadSequenceToken': u'49540113095132904942090446312687285178819573422397343074', u'logStreamName': u'69c5ac87e7e6415985116e8cb44e538e', u'lastIngestionTime': 1418242566558, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:69c5ac87e7e6415985116e8cb44e538e', u'storedBytes': 713}, {u'firstEventTimestamp': 1417213193378, u'lastEventTimestamp': 1417213193478, u'creationTime': 1417213192095, u'uploadSequenceToken': u'49540113336360065754596187770479764234792559857643841394', u'logStreamName': u'f68e3d87b8a14cdba338f6926f7cf50a', u'lastIngestionTime': 1417213194421, u'arn': u'arn:aws:logs:us-east-1:0123456789012:log-group:/aws/lambda/KinesisSample:log-stream:f68e3d87b8a14cdba338f6926f7cf50a', u'storedBytes': 711}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '2a6d4941-969b-11e4-947f-19d1c72ede7e'}}]
logs_get_log_events = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '2a7deb71-969b-11e4-914b-8f1f3d7b023d'}, u'nextForwardToken': u'f/31679748107442531967654742688057700554200447759088287749', u'events': [{u'ingestionTime': 1420569036909, u'timestamp': 1420569035842, u'message': u'2015-01-06T18:30:35.841Z\tko2sss03iq7l2pdk\tLoading event\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035899, u'message': u'START RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035940, u'message': u'2015-01-06T18:30:35.940Z\t23007242-95d2-11e4-a10e-7b2ab60a7770\t{\n "Records": [\n {\n "kinesis": {\n "partitionKey": "partitionKey-3",\n "kinesisSchemaVersion": "1.0",\n "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",\n "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"\n },\n "eventSource": "aws:kinesis",\n "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",\n "invokeIdentityArn": "arn:aws:iam::0123456789012:role/testLEBRole",\n "eventVersion": "1.0",\n "eventName": "aws:kinesis:record",\n "eventSourceARN": "arn:aws:kinesis:us-east-1:35667example:stream/examplestream",\n "awsRegion": "us-east-1"\n }\n ]\n}\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035940, u'message': u'2015-01-06T18:30:35.940Z\t23007242-95d2-11e4-a10e-7b2ab60a7770\tDecoded payload: Hello, this is a test 123.\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035941, u'message': u'END RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\n'}, {u'ingestionTime': 1420569036909, u'timestamp': 1420569035941, u'message': u'REPORT RequestId: 23007242-95d2-11e4-a10e-7b2ab60a7770\tDuration: 98.51 ms\tBilled Duration: 100 ms \tMemory Size: 128 MB\tMax Memory Used: 26 MB\t\n'}], u'nextBackwardToken': u'b/31679748105234758193000210997045664445208259969996226560'}]
cfn_describe_stacks = [
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7d66debd-96b8-11e4-a647-4f4741ffff69'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7e36fff7-96b8-11e4-af44-6350f4f8c2ae'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': 'User Initiated', u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_IN_PROGRESS', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7ef03e10-96b8-11e4-bc86-7f67e11abcfa'}},
{u'Stacks': [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', u'Description': None, u'Tags': [], u'StackStatusReason': None, u'CreationTime': datetime.datetime(2015, 1, 7, 21, 59, 43, 208000, tzinfo=tzutc()), u'Capabilities': ['CAPABILITY_IAM'], u'StackName': 'TestKinesis', u'NotificationARNs': [], u'StackStatus': 'CREATE_COMPLETE', u'DisableRollback': False}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '8c2bff8e-96b8-11e4-be70-c5ad82c32f2d'}}]
cfn_create_stack = [{u'StackId': 'arn:aws:cloudformation:us-east-1:084307701560:stack/TestKinesis/7c4ae730-96b8-11e4-94cc-5001dc3ed8d2', 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '7c2f2260-96b8-11e4-be70-c5ad82c32f2d'}}]
cfn_delete_stack = [{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'f19af5b8-96bc-11e4-860e-11ba752b58a9'}}]
......
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import unittest
import os
import mock
from kappa.policy import Policy
from tests.unit.mock_aws import get_aws
Config1 = {
'name': 'FooPolicy',
'description': 'This is the Foo policy',
'document': 'FooPolicy.json'}
Config2 = {
'name': 'BazPolicy',
'description': 'This is the Baz policy',
'document': 'BazPolicy.json'}
def path(filename):
return os.path.join(os.path.dirname(__file__), 'data', filename)
class TestPolicy(unittest.TestCase):
def setUp(self):
self.aws_patch = mock.patch('kappa.aws.get_aws', get_aws)
self.mock_aws = self.aws_patch.start()
Config1['document'] = path(Config1['document'])
Config2['document'] = path(Config2['document'])
def tearDown(self):
self.aws_patch.stop()
def test_properties(self):
mock_context = mock.Mock()
policy = Policy(mock_context, Config1)
self.assertEqual(policy.name, Config1['name'])
self.assertEqual(policy.document, Config1['document'])
self.assertEqual(policy.description, Config1['description'])
def test_exists(self):
mock_context = mock.Mock()
policy = Policy(mock_context, Config1)
self.assertTrue(policy.exists())
def test_not_exists(self):
mock_context = mock.Mock()
policy = Policy(mock_context, Config2)
self.assertFalse(policy.exists())
def test_create(self):
mock_context = mock.Mock()
policy = Policy(mock_context, Config2)
policy.create()
def test_delete(self):
mock_context = mock.Mock()
policy = Policy(mock_context, Config1)
policy.delete()
......@@ -12,56 +12,47 @@
# language governing permissions and limitations under the License.
import unittest
import os
import mock
from kappa.stack import Stack
from kappa.role import Role
from tests.unit.mock_aws import get_aws
Config = {
'template': 'roles.cf',
'stack_name': 'FooBar',
'exec_role': 'ExecRole',
'invoke_role': 'InvokeRole'}
Config1 = {'name': 'FooRole'}
Config2 = {'name': 'BazRole'}
def path(filename):
return os.path.join(os.path.dirname(__file__), 'data', filename)
class TestStack(unittest.TestCase):
class TestRole(unittest.TestCase):
def setUp(self):
self.aws_patch = mock.patch('kappa.aws.get_aws', get_aws)
self.mock_aws = self.aws_patch.start()
Config['template'] = path(Config['template'])
def tearDown(self):
self.aws_patch.stop()
def test_properties(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
self.assertEqual(stack.name, Config['stack_name'])
self.assertEqual(stack.template_path, Config['template'])
self.assertEqual(stack.exec_role, Config['exec_role'])
self.assertEqual(stack.invoke_role, Config['invoke_role'])
self.assertEqual(
stack.invoke_role_arn,
'arn:aws:iam::0123456789012:role/TestKinesis-InvokeRole-FOO')
role = Role(mock_context, Config1)
self.assertEqual(role.name, Config1['name'])
def test_exists(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
self.assertTrue(stack.exists())
role = Role(mock_context, Config1)
self.assertTrue(role.exists())
def test_not_exists(self):
mock_context = mock.Mock()
role = Role(mock_context, Config2)
self.assertFalse(role.exists())
def test_update(self):
def test_create(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
stack.update()
role = Role(mock_context, Config2)
role.create()
def test_delete(self):
mock_context = mock.Mock()
stack = Stack(mock_context, Config)
stack.delete()
role = Role(mock_context, Config1)
role.delete()
......