James Cooper

Add CloudWatchEventSource

......@@ -26,6 +26,7 @@ import kappa.event_source.dynamodb_stream
import kappa.event_source.kinesis
import kappa.event_source.s3
import kappa.event_source.sns
import kappa.event_source.cloudwatch
import kappa.policy
import kappa.role
import kappa.awsclient
......@@ -181,6 +182,7 @@ class Context(object):
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': kappa.event_source.sns.SNSEventSource,
'events': kappa.event_source.cloudwatch.CloudWatchEventSource
}
for event_source_cfg in event_sources:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
......@@ -226,7 +228,7 @@ class Context(object):
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
LOG.debug('Waiting for policy/role propagation')
time.sleep(5)
self.function.create()
self.add_event_sources()
......@@ -239,6 +241,7 @@ class Context(object):
self.function.deploy()
if self.restapi:
self.restapi.deploy()
self.add_event_sources()
def invoke(self, data):
return self.function.invoke(data)
......
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import uuid
from botocore.exceptions import ClientError
import kappa.awsclient
LOG = logging.getLogger(__name__)
class EventSource(object):
def __init__(self, context, config):
self._context = context
self._config = config
@property
def arn(self):
return self._config['arn']
@property
def starting_position(self):
return self._config.get('starting_position', 'LATEST')
@property
def batch_size(self):
return self._config.get('batch_size', 100)
@property
def enabled(self):
return self._config.get('enabled', False)
class KinesisEventSource(EventSource):
def __init__(self, context, config):
super(KinesisEventSource, self).__init__(context, config)
self._lambda = kappa.awsclient.create_client(
'lambda', context.session)
def _get_uuid(self, function):
uuid = None
response = self._lambda.call(
'list_event_source_mappings',
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSourceMappings']) > 0:
uuid = response['EventSourceMappings'][0]['UUID']
return uuid
def add(self, function):
try:
response = self._lambda.call(
'create_event_source_mapping',
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position,
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add event source')
def enable(self, function):
self._config['enabled'] = True
try:
response = self._lambda.call(
'update_event_source_mapping',
UUID=self._get_uuid(function),
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to enable event source')
def disable(self, function):
self._config['enabled'] = False
try:
response = self._lambda.call(
'update_event_source_mapping',
FunctionName=function.name,
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to disable event source')
def update(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.call(
'update_event_source_mapping',
BatchSize=self.batch_size,
Enabled=self.enabled,
FunctionName=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update event source')
def remove(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.call(
'delete_event_source_mapping',
UUID=uuid)
LOG.debug(response)
return response
def status(self, function):
response = None
LOG.debug('getting status for event source %s', self.arn)
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.call(
'get_event_source_mapping',
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
else:
LOG.debug('No UUID for event source %s', self.arn)
return response
class DynamoDBStreamEventSource(KinesisEventSource):
pass
class S3EventSource(EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
self._s3 = kappa.awsclient.create_client('s3', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def _get_bucket_name(self):
return self.arn.split(':')[-1]
def add(self, function):
notification_spec = {
'LambdaFunctionConfigurations': [
{
'Id': self._make_notification_id(function.name),
'Events': [e for e in self._config['events']],
'LambdaFunctionArn': function.arn,
}
]
}
try:
response = self._s3.call(
'put_bucket_notification_configuration',
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
except Exception as exc:
LOG.debug(exc.response)
LOG.exception('Unable to add S3 event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing s3 notification')
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' in response:
fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
if fn_arn == function.arn:
del response['CloudFunctionConfiguration']
del response['ResponseMetadata']
response = self._s3.call(
'put_bucket_notification',
Bucket=self._get_bucket_name(),
NotificationConfiguration=response)
LOG.debug(response)
disable = remove
def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' not in response:
response = None
return response
class SNSEventSource(EventSource):
def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
self._sns = kappa.awsclient.create_client('sns', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def exists(self, function):
try:
response = self._sns.call(
'list_subscriptions_by_topic',
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
if subscription['Endpoint'] == function.arn:
return subscription
return None
except Exception:
LOG.exception('Unable to find event source %s', self.arn)
def add(self, function):
try:
response = self._sns.call(
'subscribe',
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add SNS event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing SNS event source')
try:
subscription = self.exists(function)
if subscription:
response = self._sns.call(
'unsubscribe',
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove event source %s', self.arn)
disable = remove
def status(self, function):
LOG.debug('status for SNS notification for %s', function.name)
status = self.exists(function)
if status:
status['EventSourceArn'] = status['TopicArn']
return status
class CloudWatchEventSource(EventSource):
def __init__(self, context, config):
super(CloudWatchEventSource, self).__init__(context, config)
self._events = kappa.awsclient.create_client('events', context.session)
self._lambda = kappa.awsclient.create_client('lambda', context.session)
self._name = config['name']
self._context = context
self._config = config
def find(self):
response = self._events.call('list_rules', NamePrefix=self._name)
LOG.debug(response)
if 'Rules' in response:
for r in response['Rules']:
if r['Name'] == self._name:
return r
return None
def add(self, function):
kwargs = {
'Name': self._name,
'State': 'ENABLED' if self.enabled else 'DISABLED'
}
if 'schedule' in self._config:
kwargs['ScheduleExpression'] = self._config['schedule']
if 'pattern' in self._config:
kwargs['EventPattern'] = self._config['pattern']
if 'description' in self._config:
kwargs['Description'] = self._config['description']
if 'role_arn' in self._config:
kwargs['RoleArn'] = self._config['role_arn']
try:
response = self._events.call('put_rule', **kwargs)
LOG.debug(response)
self._config['arn'] = response['RuleArn']
self._lambda.call('add_permission',
FunctionName=function.name,
StatementId=str(uuid.uuid4()),
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=response['RuleArn'])
response = self._events.call('put_targets',
Rule=self._name,
Targets=[{
'Id': '1',
'Arn': function.arn
}])
LOG.debug(response)
except Exception:
LOG.exception('Unable to put CloudWatch event source')
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing CloudWatch event source')
try:
rule = self.find()
if rule:
response = self._events.call(
'delete_rule',
Name=self._name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove CloudWatch event source %s', self._name)
def status(self, function):
LOG.debug('status for CloudWatch event for %s', function.name)
return self._to_status(self.find())
def enable(self, function):
if self.find():
self._events.call('enable_rule', Name=self._name)
def disable(self, function):
if self.find():
self._events.call('disable_rule', Name=self._name)
def _to_status(self, rule):
if rule:
return {
'EventSourceArn': rule['Arn'],
'State': rule['State']
}
else:
return None
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kappa.event_source.base
import logging
import uuid
LOG = logging.getLogger(__name__)
class CloudWatchEventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(CloudWatchEventSource, self).__init__(context, config)
self._events = kappa.awsclient.create_client('events', context.session)
self._lambda = kappa.awsclient.create_client('lambda', context.session)
self._name = config['arn'].split('/')[-1]
self._context = context
self._config = config
def get_rule(self):
response = self._events.call('list_rules', NamePrefix=self._name)
LOG.debug(response)
if 'Rules' in response:
for r in response['Rules']:
if r['Name'] == self._name:
return r
return None
def add(self, function):
kwargs = {
'Name': self._name,
'State': 'ENABLED' if self.enabled else 'DISABLED'
}
if 'schedule' in self._config:
kwargs['ScheduleExpression'] = self._config['schedule']
if 'pattern' in self._config:
kwargs['EventPattern'] = self._config['pattern']
if 'description' in self._config:
kwargs['Description'] = self._config['description']
if 'role_arn' in self._config:
kwargs['RoleArn'] = self._config['role_arn']
try:
response = self._events.call('put_rule', **kwargs)
LOG.debug(response)
self._config['arn'] = response['RuleArn']
response = self._lambda.call('add_permission',
FunctionName=function.name,
StatementId=str(uuid.uuid4()),
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=response['RuleArn'])
LOG.debug(response)
response = self._events.call('put_targets',
Rule=self._name,
Targets=[{
'Id': function.name,
'Arn': function.arn
}])
LOG.debug(response)
except Exception:
LOG.exception('Unable to put CloudWatch event source')
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing CloudWatch event source')
try:
rule = self.get_rule()
if rule:
response = self._events.call('remove_targets',
Rule=self._name,
Ids=[function.name])
LOG.debug(response)
response = self._events.call('delete_rule',
Name=self._name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove CloudWatch event source %s', self._name)
def status(self, function):
LOG.debug('status for CloudWatch event for %s', function.name)
return self._to_status(self.get_rule())
def enable(self, function):
if self.get_rule():
self._events.call('enable_rule', Name=self._name)
def disable(self, function):
if self.get_rule():
self._events.call('disable_rule', Name=self._name)
def _to_status(self, rule):
if rule:
return {
'EventSourceArn': rule['Arn'],
'State': rule['State']
}
else:
return None