Mitch Garnaat

WIP Commit. Updating to use new GA version of the Lambda API. Also moving from…

… botocore to boto3.  Also adding SNS example.  No longer using CloudFormation for policies since we only need one and CloudFormation does not yet support managed policies.  Haven't updated any tests yet so they will all be failing for now.  Also need to update README.
......@@ -13,6 +13,7 @@
# language governing permissions and limitations under the License.
from datetime import datetime
import logging
import base64
import click
......@@ -38,18 +39,20 @@ def cli(ctx, config=None, debug=False):
@cli.command()
@click.pass_context
def deploy(ctx):
def create(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('deploying...')
context.deploy()
context.create()
click.echo('...done')
@cli.command()
@click.pass_context
def test(ctx):
def invoke(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('testing...')
context.test()
click.echo('invoking...')
response = context.invoke()
log_data = base64.b64decode(response['LogResult'])
click.echo(log_data)
click.echo('...done')
@cli.command()
......@@ -67,31 +70,32 @@ def tail(ctx):
def status(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
status = context.status()
click.echo(click.style('Stack', bold=True))
if status['stack']:
for stack in status['stack']['Stacks']:
line = ' {}: {}'.format(stack['StackId'], stack['StackStatus'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Policy', bold=True))
if status['policy']:
line = ' {} ({})'.format(
status['policy']['PolicyName'],
status['policy']['Arn'])
click.echo(click.style(line, fg='green'))
click.echo(click.style('Role', bold=True))
if status['role']:
line = ' {} ({})'.format(
status['role']['Role']['RoleName'],
status['role']['Role']['Arn'])
click.echo(click.style(line, fg='green'))
click.echo(click.style('Function', bold=True))
if status['function']:
line = ' {}'.format(
status['function']['Configuration']['FunctionName'])
line = ' {} ({})'.format(
status['function']['Configuration']['FunctionName'],
status['function']['Configuration']['FunctionArn'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
click.echo(click.style('Event Sources', bold=True))
if status['event_sources']:
for event_source in status['event_sources']:
if 'EventSource' in event_source:
line = ' {}: {}'.format(
event_source['EventSource'], event_source['IsActive'])
click.echo(click.style(line, fg='green'))
else:
line = ' {}'.format(
event_source['CloudFunctionConfiguration']['Id'])
click.echo(click.style(line, fg='green'))
line = ' {}: {}'.format(
event_source['EventSourceArn'], event_source['State'])
click.echo(click.style(line, fg='green'))
else:
click.echo(click.style(' None', fg='green'))
......
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2014,2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
......@@ -11,21 +11,20 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import botocore.session
import boto3
class __AWS(object):
def __init__(self, profile=None, region=None):
def __init__(self, profile_name=None, region_name=None):
self._client_cache = {}
self._session = botocore.session.get_session()
self._session.profile = profile
self._region = region
self._session = boto3.session.Session(
region_name=region_name, profile_name=profile_name)
def create_client(self, client_name):
if client_name not in self._client_cache:
self._client_cache[client_name] = self._session.create_client(
client_name, self._region)
self._client_cache[client_name] = self._session.client(
client_name)
return self._client_cache[client_name]
......
......@@ -16,7 +16,8 @@ import yaml
import kappa.function
import kappa.event_source
import kappa.stack
import kappa.policy
import kappa.role
LOG = logging.getLogger(__name__)
......@@ -32,8 +33,16 @@ class Context(object):
else:
self.set_logger('kappa', logging.INFO)
self.config = yaml.load(config_file)
self._stack = kappa.stack.Stack(
self, self.config['cloudformation'])
if 'policy' in self.config.get('iam', ''):
self.policy = kappa.policy.Policy(
self, self.config['iam']['policy'])
else:
self.policy = None
if 'role' in self.config.get('iam', ''):
self.role = kappa.role.Role(
self, self.config['iam']['role'])
else:
self.role = None
self.function = kappa.function.Function(
self, self.config['lambda'])
self.event_sources = []
......@@ -57,11 +66,7 @@ class Context(object):
@property
def exec_role_arn(self):
return self._stack.exec_role_arn
@property
def invoke_role_arn(self):
return self._stack.invoke_role_arn
return self.role.arn
def debug(self):
self.set_logger('kappa', logging.DEBUG)
......@@ -90,44 +95,64 @@ class Context(object):
log.addHandler(ch)
def _create_event_sources(self):
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
if 'event_sources' in self.config['lambda']:
for event_source_cfg in self.config['lambda']['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
else:
msg = 'Unsupported event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
elif svc == 'sns':
self.event_sources.append(
kappa.event_source.SNSEventSource(self,
event_source_cfg))
else:
msg = 'Unknown event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
def add_event_sources(self):
for event_source in self.event_sources:
event_source.add(self.function)
def deploy(self):
self._stack.update()
self.function.upload()
def create(self):
if self.policy:
self.policy.create()
if self.role:
self.role.create()
self.function.create()
def test(self):
self.function.test()
def invoke(self):
return self.function.invoke()
def tail(self):
return self.function.tail()
def delete(self):
self._stack.delete()
if self.policy:
self.policy.delete()
if self.role:
self.role.delete()
self.function.delete()
for event_source in self.event_sources:
event_source.remove(self.function)
def status(self):
status = {}
status['stack'] = self._stack.status()
if self.policy:
status['policy'] = self.policy.status()
else:
status['policy'] = None
if self.role:
status['role'] = self.role.status()
else:
status['role'] = None
status['function'] = self.function.status()
status['event_sources'] = []
for event_source in self.event_sources:
status['event_sources'].append(event_source.status(self.function))
if self.event_sources:
for event_source in self.event_sources:
status['event_sources'].append(
event_source.status(self.function))
return status
......
......@@ -31,6 +31,10 @@ class EventSource(object):
return self._config['arn']
@property
def starting_position(self):
return self._config.get('starting_position', 'TRIM_HORIZON')
@property
def batch_size(self):
return self._config.get('batch_size', 100)
......@@ -44,21 +48,21 @@ class KinesisEventSource(EventSource):
def _get_uuid(self, function):
uuid = None
response = self._lambda.list_event_sources(
response = self._lambda.list_event_source_mappings(
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSources']) > 0:
uuid = response['EventSources'][0]['UUID']
if len(response['EventSourceMappings']) > 0:
uuid = response['EventSourceMappings'][0]['UUID']
return uuid
def add(self, function):
try:
response = self._lambda.add_event_source(
response = self._lambda.create_event_source_mapping(
FunctionName=function.name,
Role=self._context.invoke_role_arn,
EventSource=self.arn,
BatchSize=self.batch_size)
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add Kinesis event source')
......@@ -67,7 +71,7 @@ class KinesisEventSource(EventSource):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.remove_event_source(
response = self._lambda.delete_event_source_mapping(
UUID=uuid)
LOG.debug(response)
return response
......@@ -75,7 +79,7 @@ class KinesisEventSource(EventSource):
def status(self, function):
LOG.debug('getting status for event source %s', self.arn)
try:
response = self._lambda.get_event_source(
response = self._lambda.get_event_source_mapping(
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
......@@ -134,3 +138,50 @@ class S3EventSource(EventSource):
if 'CloudFunctionConfiguration' not in response:
response = None
return response
class SNSEventSource(EventSource):
def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
aws = kappa.aws.get_aws(context)
self._sns = aws.create_client('sns')
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def exists(self, function):
try:
response = self._sns.list_subscriptions_by_topic(
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
if subscription['Endpoint'] == function.arn:
return subscription
return None
except Exception:
LOG.exception('Unable to find event source %s', self.arn)
def add(self, function):
try:
response = self._sns.subscribe(
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add SNS event source')
def remove(self, function):
LOG.debug('removing SNS event source')
try:
subscription = self.exists(function)
if subscription:
response = self._sns.unsubscribe(
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove event source %s', self.arn)
def status(self, function):
LOG.debug('status for SNS notification for %s', function.name)
return self.exist(function)
......
......@@ -46,10 +46,6 @@ class Function(object):
return self._config['handler']
@property
def mode(self):
return self._config['mode']
@property
def description(self):
return self._config['description']
......@@ -74,13 +70,17 @@ class Function(object):
return self._config['test_data']
@property
def permissions(self):
return self._config.get('permissions', list())
@property
def arn(self):
if self._arn is None:
try:
response = self._lambda_svc.get_function_configuration(
response = self._lambda_svc.get_function(
FunctionName=self.name)
LOG.debug(response)
self._arn = response['FunctionARN']
self._arn = response['Configuration']['FunctionArn']
except Exception:
LOG.debug('Unable to find ARN for function: %s', self.name)
return self._arn
......@@ -124,25 +124,45 @@ class Function(object):
else:
self._zip_lambda_file(zipfile_name, lambda_fn)
def upload(self):
LOG.debug('uploading %s', self.zipfile_name)
def add_permissions(self):
for permission in self.permissions:
try:
kwargs = {
'FunctionName': self.name,
'StatementId': permission['statement_id'],
'Action': permission['action'],
'Principal': permission['principal']}
source_arn = permission.get('source_arn', None)
if source_arn:
kwargs['SourceArn'] = source_arn
source_account = permission.get('source_account', None)
if source_account:
kwargs['SourceAccount'] = source_account
response = self._lambda_svc.add_permission(**kwargs)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add permission')
def create(self):
LOG.debug('creating %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
exec_role = self._context.exec_role_arn
try:
response = self._lambda_svc.upload_function(
zipdata = fp.read()
response = self._lambda_svc.create_function(
FunctionName=self.name,
FunctionZip=fp,
Code={'ZipFile': zipdata},
Runtime=self.runtime,
Role=exec_role,
Handler=self.handler,
Mode=self.mode,
Description=self.description,
Timeout=self.timeout,
MemorySize=self.memory_size)
LOG.debug(response)
except Exception:
LOG.exception('Unable to upload zip file')
self.add_permissions()
def delete(self):
LOG.debug('deleting function %s', self.name)
......@@ -169,5 +189,14 @@ class Function(object):
InvokeArgs=fp)
LOG.debug(response)
def test(self):
self.invoke_asynch(self.test_data)
def invoke(self, test_data=None):
if test_data is None:
test_data = self.test_data
LOG.debug('invoke %s', test_data)
with open(test_data) as fp:
response = self._lambda_svc.invoke(
FunctionName=self.name,
LogType='Tail',
Payload=fp.read())
LOG.debug(response)
return response
......
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
LOG = logging.getLogger(__name__)
class Policy(object):
Path = '/kappa/'
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._arn = None
@property
def name(self):
return self._config['name']
@property
def description(self):
return self._config.get('description', None)
@property
def document(self):
return self._config['document']
@property
def arn(self):
if self._arn is None:
policy = self.exists()
if policy:
self._arn = policy.get('Arn', None)
return self._arn
def exists(self):
try:
response = self._iam_svc.list_policies(PathPrefix=self.Path)
LOG.debug(response)
for policy in response['Policies']:
if policy['PolicyName'] == self.name:
return policy
except Exception:
LOG.exception('Error listing policies')
return None
def create(self):
LOG.debug('creating policy %s', self.name)
policy = self.exists()
if not policy:
with open(self.document, 'rb') as fp:
try:
response = self._iam_svc.create_policy(
Path=self.Path, PolicyName=self.name,
PolicyDocument=fp.read(),
Description=self.description)
LOG.debug(response)
except Exception:
LOG.exception('Error creating Policy')
def delete(self):
LOG.debug('deleting policy %s', self.name)
response = self._iam_svc.delete_policy(PolicyArn=self.arn)
LOG.debug(response)
return response
def status(self):
LOG.debug('getting status for policy %s', self.name)
return self.exists()
# Copyright (c) 2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
from botocore.exceptions import ClientError
import kappa.aws
LOG = logging.getLogger(__name__)
AssumeRolePolicyDocument = """{
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
} ]
}"""
class Role(object):
Path = '/kappa/'
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(context)
self._iam_svc = aws.create_client('iam')
self._arn = None
@property
def name(self):
return self._config['name']
@property
def arn(self):
if self._arn is None:
try:
response = self._iam_svc.get_role(
RoleName=self.name)
LOG.debug(response)
self._arn = response['Role']['Arn']
except Exception:
LOG.debug('Unable to find ARN for role: %s', self.name)
return self._arn
def exists(self):
try:
response = self._iam_svc.list_roles(PathPrefix=self.Path)
LOG.debug(response)
for role in response['Roles']:
if role['RoleName'] == self.name:
return role
except Exception:
LOG.exception('Error listing roles')
return None
def create(self):
LOG.debug('creating role %s', self.name)
role = self.exists()
if not role:
try:
response = self._iam_svc.create_role(
Path=self.Path, RoleName=self.name,
AssumeRolePolicyDocument=AssumeRolePolicyDocument)
LOG.debug(response)
except Exception:
LOG.exception('Error creating Role')
def delete(self):
LOG.debug('deleting role %s', self.name)
response = self._iam_svc.delete_role(RoleName=self.name)
LOG.debug(response)
return response
def status(self):
LOG.debug('getting status for role %s', self.name)
try:
response = self._iam_svc.get_role(RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('role %s not found', self.name)
response = None
return response
# Copyright (c) 2014 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import time
import kappa.aws
LOG = logging.getLogger(__name__)
class Stack(object):
completed_states = ('CREATE_COMPLETE', 'UPDATE_COMPLETE')
failed_states = ('UPDATE_ROLLBACK_COMPLETE', 'ROLLBACK_COMPLETE')
def __init__(self, context, config):
self._context = context
self._config = config
aws = kappa.aws.get_aws(self._context)
self._cfn = aws.create_client('cloudformation')
self._iam = aws.create_client('iam')
@property
def name(self):
return self._config['stack_name']
@property
def template_path(self):
return self._config['template']
@property
def exec_role(self):
return self._config['exec_role']
@property
def exec_role_arn(self):
return self._get_role_arn(self.exec_role)
@property
def invoke_role(self):
return self._config['invoke_role']
@property
def invoke_role_arn(self):
return self._get_role_arn(self.invoke_role)
def _get_role_arn(self, role_name):
role_arn = None
try:
resources = self._cfn.list_stack_resources(
StackName=self.name)
LOG.debug(resources)
except Exception:
LOG.exception('Unable to find role ARN: %s', role_name)
for resource in resources['StackResourceSummaries']:
if resource['LogicalResourceId'] == role_name:
role = self._iam.get_role(
RoleName=resource['PhysicalResourceId'])
LOG.debug(role)
role_arn = role['Role']['Arn']
LOG.debug('role_arn: %s', role_arn)
return role_arn
def exists(self):
"""
Does Cloudformation Stack already exist?
"""
try:
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug('Stack %s exists', self.name)
except Exception:
LOG.debug('Stack %s does not exist', self.name)
response = None
return response
def wait(self):
done = False
while not done:
time.sleep(1)
response = self._cfn.describe_stacks(StackName=self.name)
LOG.debug(response)
status = response['Stacks'][0]['StackStatus']
LOG.debug('Stack status is: %s', status)
if status in self.completed_states:
done = True
if status in self.failed_states:
msg = 'Could not create stack %s: %s' % (self.name, status)
raise ValueError(msg)
def _create(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.create_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to create stack')
self.wait()
def _update(self):
LOG.debug('create_stack: stack_name=%s', self.name)
template_body = open(self.template_path).read()
try:
response = self._cfn.update_stack(
StackName=self.name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
LOG.debug(response)
except Exception as e:
if 'ValidationError' in str(e):
LOG.info('No Updates Required')
else:
LOG.exception('Unable to update stack')
self.wait()
def update(self):
if self.exists():
self._update()
else:
self._create()
def status(self):
return self.exists()
def delete(self):
LOG.debug('delete_stack: stack_name=%s', self.name)
try:
response = self._cfn.delete_stack(StackName=self.name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to delete stack: %s', self.name)
botocore==0.94.0
click==3.3
boto3==0.0.15
click==4.0
PyYAML>=3.11
mock>=1.0.1
nose==1.3.1
......
console.log('Loading event');
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, ' '));
for(i = 0; i < event.Records.length; ++i) {
encodedPayload = event.Records[i].kinesis.data;
payload = new Buffer(encodedPayload, 'base64').toString('ascii');
console.log("Decoded payload: " + payload);
}
context.done(null, "Hello World"); // SUCCESS with message
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
......
---
profile: personal
region: us-east-1
cloudformation:
template: roles.cf
stack_name: TestKinesis
exec_role: ExecRole
invoke_role: InvokeRole
iam:
role_name: KinesisSampleRole
role_policy: AWSLambdaKinesisExecutionRole
lambda:
name: KinesisSample
zipfile_name: KinesisSample.zip
......@@ -15,9 +13,10 @@ lambda:
runtime: nodejs
memory_size: 128
timeout: 3
mode: event
event_sources:
-
arn: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
starting_position: TRIM_HORIZON
batch_size: 100
test_data: input.json
\ No newline at end of file
......
......@@ -12,8 +12,8 @@
"invokeIdentityArn": "arn:aws:iam::059493405231:role/testLEBRole",
"eventVersion": "1.0",
"eventName": "aws:kinesis:record",
"eventSourceARN": "arn:aws:kinesis:us-east-1:35667example:stream/examplestream",
"awsRegion": "us-east-1"
"eventSourceARN": "arn:aws:kinesis:us-west-2:35667example:stream/examplestream",
"awsRegion": "us-west-2"
}
]
}
......
{
"Version": "2012-10-17",
"Statement":[
{
"Sid":"Stmt1428510662000",
"Effect":"Allow",
"Action":["dynamodb:*"],
"Resource":["arn:aws:dynamodb:us-east-1:084307701560:table/snslambda"]
}
]
}
---
profile: personal
region: us-east-1
resources: resources.json
iam:
policy:
description: A policy used with the Kappa SNS->DynamoDB example
name: LambdaSNSSamplePolicy
document: LambdaSNSSamplePolicy.json
role:
name: SNSSampleRole
policy: LambdaSNSSamplePolicy
lambda:
name: SNSSample
zipfile_name: SNSSample.zip
description: Testing SNS -> DynamoDB Lambda handler
path: messageStore.js
handler: messageStore.handler
runtime: nodejs
memory_size: 128
timeout: 3
permissions:
-
statement_id: sns_invoke
action: lambda:invokeFunction
principal: sns.amazonaws.com
source_arn: arn:aws:sns:us-east-1:084307701560:lambda_topic
event_sources:
-
arn: arn:aws:sns:us-east-1:084307701560:lambda_topic
test_data: input.json
\ No newline at end of file
{
"TableName": "snslambda",
"AttributeDefinitions": [
{
"AttributeName": "SnsTopicArn",
"AttributeType": "S"
},
{
"AttributeName": "SnsPublishTime",
"AttributeType": "S"
},
{
"AttributeName": "SnsMessageId",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "SnsTopicArn",
"KeyType": "HASH"
},
{
"AttributeName": "SnsPublishTime",
"KeyType": "RANGE"
}
],
"GlobalSecondaryIndexes": [
{
"IndexName": "MesssageIndex",
"KeySchema": [
{
"AttributeName": "SnsMessageId",
"KeyType": "HASH"
}
],
"Projection": {
"ProjectionType": "ALL"
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 1
}
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
console.log('Loading event');
var aws = require('aws-sdk');
var ddb = new aws.DynamoDB({params: {TableName: 'snslambda'}});
exports.handler = function(event, context) {
var SnsMessageId = event.Records[0].Sns.MessageId;
var SnsPublishTime = event.Records[0].Sns.Timestamp;
var SnsTopicArn = event.Records[0].Sns.TopicArn;
var LambdaReceiveTime = new Date().toString();
var itemParams = {Item: {SnsTopicArn: {S: SnsTopicArn},
SnsPublishTime: {S: SnsPublishTime}, SnsMessageId: {S: SnsMessageId},
LambdaReceiveTime: {S: LambdaReceiveTime} }};
ddb.putItem(itemParams, function() {
context.done(null,'');
});
};
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "Creates the DynamoDB Table needed for the example",
"Resources" : {
"snslambda" : {
"Type" : "AWS::DynamoDB::Table",
"Properties" : {
"AttributeDefinitions": [
{
"AttributeName" : "SnsTopicArn",
"AttributeType" : "S"
},
{
"AttributeName" : "SnsPublishTime",
"AttributeType" : "S"
}
],
"KeySchema": [
{ "AttributeName": "SnsTopicArn", "KeyType": "HASH" },
{ "AttributeName": "SnsPublishTime", "KeyType": "RANGE" }
],
"ProvisionedThroughput" : {
"ReadCapacityUnits" : 5,
"WriteCapacityUnits" : 5
}
}
}
},
"Outputs" : {
"TableName" : {
"Value" : {"Ref" : "snslambda"},
"Description" : "Table name of the newly created DynamoDB table"
}
}
}
......@@ -5,8 +5,8 @@ from setuptools import setup, find_packages
import os
requires = [
'botocore==0.94.0',
'click==3.3',
'boto3==0.0.15',
'click==4.0',
'PyYAML>=3.11'
]
......