Jose Diaz-Gonzalez
Committed by GitHub

Merge pull request #69 from coopernurse/cloudwatch-events

CloudWatch events - Fixes #52
......@@ -26,6 +26,7 @@ import kappa.event_source.dynamodb_stream
import kappa.event_source.kinesis
import kappa.event_source.s3
import kappa.event_source.sns
import kappa.event_source.cloudwatch
import kappa.policy
import kappa.role
import kappa.awsclient
......@@ -181,6 +182,7 @@ class Context(object):
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': kappa.event_source.sns.SNSEventSource,
'events': kappa.event_source.cloudwatch.CloudWatchEventSource
}
for event_source_cfg in event_sources:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
......@@ -226,7 +228,7 @@ class Context(object):
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
LOG.debug('Waiting for policy/role propagation')
time.sleep(5)
self.function.create()
self.add_event_sources()
......@@ -239,6 +241,7 @@ class Context(object):
self.function.deploy()
if self.restapi:
self.restapi.deploy()
self.add_event_sources()
def invoke(self, data):
return self.function.invoke(data)
......
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kappa.event_source.base
import logging
import uuid
LOG = logging.getLogger(__name__)
class CloudWatchEventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(CloudWatchEventSource, self).__init__(context, config)
self._events = kappa.awsclient.create_client('events', context.session)
self._lambda = kappa.awsclient.create_client('lambda', context.session)
self._name = config['arn'].split('/')[-1]
self._context = context
self._config = config
def get_rule(self):
response = self._events.call('list_rules', NamePrefix=self._name)
LOG.debug(response)
if 'Rules' in response:
for r in response['Rules']:
if r['Name'] == self._name:
return r
return None
def add(self, function):
kwargs = {
'Name': self._name,
'State': 'ENABLED' if self.enabled else 'DISABLED'
}
if 'schedule' in self._config:
kwargs['ScheduleExpression'] = self._config['schedule']
if 'pattern' in self._config:
kwargs['EventPattern'] = self._config['pattern']
if 'description' in self._config:
kwargs['Description'] = self._config['description']
if 'role_arn' in self._config:
kwargs['RoleArn'] = self._config['role_arn']
try:
response = self._events.call('put_rule', **kwargs)
LOG.debug(response)
self._config['arn'] = response['RuleArn']
response = self._lambda.call('add_permission',
FunctionName=function.name,
StatementId=str(uuid.uuid4()),
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=response['RuleArn'])
LOG.debug(response)
response = self._events.call('put_targets',
Rule=self._name,
Targets=[{
'Id': function.name,
'Arn': function.arn
}])
LOG.debug(response)
except Exception:
LOG.exception('Unable to put CloudWatch event source')
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing CloudWatch event source')
try:
rule = self.get_rule()
if rule:
response = self._events.call('remove_targets',
Rule=self._name,
Ids=[function.name])
LOG.debug(response)
response = self._events.call('delete_rule',
Name=self._name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove CloudWatch event source %s', self._name)
def status(self, function):
LOG.debug('status for CloudWatch event for %s', function.name)
return self._to_status(self.get_rule())
def enable(self, function):
if self.get_rule():
self._events.call('enable_rule', Name=self._name)
def disable(self, function):
if self.get_rule():
self._events.call('disable_rule', Name=self._name)
def _to_status(self, rule):
if rule:
return {
'EventSourceArn': rule['Arn'],
'State': rule['State']
}
return None
.kappa/
kappa.yml
*.zip
import logging
import time
LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
def handler(event, context):
LOG.debug(event)
return {'status': 'success', 'time': time.time()}
{
"foo": "bar",
"fie": "baz"
}
---
name: kappa-cron
environments:
dev:
profile: <your profile here>
region: <your region here>
policy:
resources:
- arn: arn:aws:logs:*:*:*
actions:
- "*"
event_sources:
- arn: arn:aws:events:<your region here>:<your account id>:rule/kappa-cron-dev
schedule: rate(1 minute)
description: cron to run this lambda function every minute
enabled: true
lambda:
description: Kappa sample lambda that runs every minute
handler: simple.handler
runtime: python2.7
memory_size: 128
timeout: 3