Mitch Garnaat

Adding samples directory and add-event-source command plus polling after create/updating CF stack.

......@@ -2,3 +2,4 @@ include README.md
include LICENSE
include requirements.txt
include kappa/_version
recursive-include samples *.js *.yml *.cf *.json
......
......@@ -64,7 +64,7 @@ def set_debug_logger(logger_names=['kappa'], stream=None):
@click.argument(
'command',
required=True,
type=click.Choice(['deploy', 'test', 'tail', 'delete'])
type=click.Choice(['deploy', 'test', 'tail', 'add-event-source', 'delete'])
)
def main(config=None, debug=False, dryrun=False, command=None):
if debug:
......@@ -79,6 +79,8 @@ def main(config=None, debug=False, dryrun=False, command=None):
kappa.tail()
elif command == 'delete':
kappa.delete()
elif command == 'add-event-source':
kappa.add_event_source()
if __name__ == '__main__':
......
......@@ -14,6 +14,7 @@
import logging
import os
import zipfile
import time
import botocore.session
from botocore.exceptions import ClientError
......@@ -23,6 +24,8 @@ LOG = logging.getLogger(__name__)
class Kappa(object):
completed_states = ('CREATE_COMPLETE', 'UPDATE_COMPLETE')
def __init__(self, config):
self.config = config
self.session = botocore.session.get_session()
......@@ -52,6 +55,14 @@ class Kappa(object):
response = cfn.create_stack(
StackName=stack_name, TemplateBody=template_body,
Capabilities=['CAPABILITY_IAM'])
done = False
while not done:
response = cfn.describe_stacks(StackName=stack_name)
status = response['Stacks'][0]['StackStatus']
LOG.debug('Stack status is: %s', status)
if status in self.completed_states:
done = True
time.sleep(1)
def get_role_arn(self, role_name):
role_arn = None
......@@ -157,6 +168,20 @@ class Kappa(object):
for log_event in response['events']:
print('%s: %s' % (log_event['timestamp'], log_event['message']))
def add_event_source(self):
lambda_svc = self.session.create_client('lambda', self.region)
try:
invoke_role = self.get_role_arn(
self.config['cloudformation']['invoke_role'])
response = lambda_svc.add_event_source(
FunctionName=self.config['lambda']['name'],
Role=invoke_role,
EventSource=self.config['lambda']['event_source'],
BatchSize=self.config['lambda'].get('batch_size', 100))
LOG.debug(response)
except Exception:
LOG.exception('Unable to add event source')
def deploy(self):
self.create_update_roles(
self.config['cloudformation']['stack_name'],
......
console.log('Loading event');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, ' '));
for(i = 0; i < event.Records.length; ++i) {
encodedPayload = event.Records[i].kinesis.data;
payload = new Buffer(encodedPayload, 'base64').toString('ascii');
console.log("Decoded payload: " + payload);
}
context.done(null, "Hello World"); // SUCCESS with message
};
---
profile: personal
region: us-east-1
cloudformation:
template: roles.cf
stack_name: TestKinesis
exec_role: ExecRole
invoke_role: InvokeRole
lambda:
name: KinesisSample
zipfile_name: KinesisSample.zip
description: Testing Kinesis Lambda handler
path: ProcessKinesisRecords.js
handler: ProcessKinesisRecords.handler
runtime: nodejs
memory_size: 128
timeout: 3
mode: event
event_source: arn:aws:kinesis:us-east-1:084307701560:stream/lambdastream
test_data: input.json
\ No newline at end of file
{
"Records": [
{
"kinesis": {
"partitionKey": "partitionKey-3",
"kinesisSchemaVersion": "1.0",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
},
"eventSource": "aws:kinesis",
"eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
"invokeIdentityArn": "arn:aws:iam::059493405231:role/testLEBRole",
"eventVersion": "1.0",
"eventName": "aws:kinesis:record",
"eventSourceARN": "arn:aws:kinesis:us-east-1:35667example:stream/examplestream",
"awsRegion": "us-east-1"
}
]
}
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"ExecRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
} ]
}
}
},
"ExecRolePolicies": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": "ExecRolePolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
} ]
},
"Roles": [ { "Ref": "ExecRole" } ]
}
},
"InvokeRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement": [ {
"Effect": "Allow",
"Principal": {
"Service": [ "s3.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ],
"Condition": {
"ArnLike": {
"sts:ExternalId": "arn:aws:s3:::*"
}
}
},
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
} ]
}
}
},
"InvokeRolePolicies": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": "ExecRolePolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Effect":"Allow",
"Action":[
"lambda:InvokeFunction",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Resource":[
"*"
]
}
]
},
"Roles": [ { "Ref": "InvokeRole" } ]
}
}
}
}