Jose Diaz-Gonzalez
Committed by GitHub

Merge pull request #67 from garnaat/event-source-refactor

Event source refactor
......@@ -18,10 +18,14 @@ import yaml
import time
import os
import shutil
import sys
import kappa.function
import kappa.restapi
import kappa.event_source
import kappa.event_source.dynamodb_stream
import kappa.event_source.kinesis
import kappa.event_source.s3
import kappa.event_source.sns
import kappa.policy
import kappa.role
import kappa.awsclient
......@@ -31,7 +35,7 @@ import placebo
LOG = logging.getLogger(__name__)
DebugFmtString = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
InfoFmtString = '...%(message)s'
InfoFmtString = '-> %(message)s'
class Context(object):
......@@ -45,6 +49,13 @@ class Context(object):
self._load_cache()
self.config = yaml.load(config_file)
self.environment = environment
if self.environment not in self.config.get('environments', {}):
message = 'Invalid environment {0} specified'.format(
self.environment)
LOG.error(message)
sys.exit(1)
profile = self.config['environments'][self.environment]['profile']
region = self.config['environments'][self.environment]['region']
self.session = kappa.awsclient.create_session(profile, region)
......@@ -158,27 +169,27 @@ class Context(object):
def _create_event_sources(self):
env_cfg = self.config['environments'][self.environment]
if 'event_sources' in env_cfg:
for event_source_cfg in env_cfg['event_sources']:
if 'event_sources' not in env_cfg:
return
event_sources = env_cfg.get('event_sources', {})
if not event_sources:
return
event_source_map = {
'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource,
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': kappa.event_source.sns.SNSEventSource,
}
for event_source_cfg in event_sources:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
kappa.event_source.KinesisEventSource(
self, event_source_cfg))
elif svc == 's3':
self.event_sources.append(kappa.event_source.S3EventSource(
self, event_source_cfg))
elif svc == 'sns':
event_source = event_source_map.get(svc, None)
if not event_source:
raise ValueError('Unknown event source: {0}'.format(
event_source_cfg['arn']))
self.event_sources.append(
kappa.event_source.SNSEventSource(
self, event_source_cfg))
elif svc == 'dynamodb':
self.event_sources.append(
kappa.event_source.DynamoDBStreamEventSource(
self, event_source_cfg))
else:
msg = 'Unknown event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
event_source(self, event_source_cfg))
def add_event_sources(self):
for event_source in self.event_sources:
......
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class EventSource(object):
def __init__(self, context, config):
self._context = context
self._config = config
@property
def arn(self):
return self._config['arn']
@property
def starting_position(self):
return self._config.get('starting_position', 'LATEST')
@property
def batch_size(self):
return self._config.get('batch_size', 100)
@property
def enabled(self):
return self._config.get('enabled', False)
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kappa.event_source.kinesis
class DynamoDBStreamEventSource(kappa.event_source.kinesis.KinesisEventSource):
pass
......@@ -13,39 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import botocore.exceptions
import kappa.event_source.base
import logging
from botocore.exceptions import ClientError
import kappa.awsclient
LOG = logging.getLogger(__name__)
class EventSource(object):
def __init__(self, context, config):
self._context = context
self._config = config
@property
def arn(self):
return self._config['arn']
@property
def starting_position(self):
return self._config.get('starting_position', 'LATEST')
@property
def batch_size(self):
return self._config.get('batch_size', 100)
@property
def enabled(self):
return self._config.get('enabled', False)
class KinesisEventSource(EventSource):
class KinesisEventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(KinesisEventSource, self).__init__(context, config)
......@@ -135,140 +110,9 @@ class KinesisEventSource(EventSource):
'get_event_source_mapping',
UUID=self._get_uuid(function))
LOG.debug(response)
except ClientError:
except botocore.exceptions.ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
else:
LOG.debug('No UUID for event source %s', self.arn)
return response
class DynamoDBStreamEventSource(KinesisEventSource):
pass
class S3EventSource(EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
self._s3 = kappa.awsclient.create_client('s3', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def _get_bucket_name(self):
return self.arn.split(':')[-1]
def add(self, function):
notification_spec = {
'LambdaFunctionConfigurations': [
{
'Id': self._make_notification_id(function.name),
'Events': [e for e in self._config['events']],
'LambdaFunctionArn': function.arn,
}
]
}
try:
response = self._s3.call(
'put_bucket_notification_configuration',
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
except Exception as exc:
LOG.debug(exc.response)
LOG.exception('Unable to add S3 event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing s3 notification')
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' in response:
fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
if fn_arn == function.arn:
del response['CloudFunctionConfiguration']
del response['ResponseMetadata']
response = self._s3.call(
'put_bucket_notification',
Bucket=self._get_bucket_name(),
NotificationConfiguration=response)
LOG.debug(response)
disable = remove
def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' not in response:
response = None
return response
class SNSEventSource(EventSource):
def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
self._sns = kappa.awsclient.create_client('sns', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def exists(self, function):
try:
response = self._sns.call(
'list_subscriptions_by_topic',
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
if subscription['Endpoint'] == function.arn:
return subscription
return None
except Exception:
LOG.exception('Unable to find event source %s', self.arn)
def add(self, function):
try:
response = self._sns.call(
'subscribe',
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add SNS event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing SNS event source')
try:
subscription = self.exists(function)
if subscription:
response = self._sns.call(
'unsubscribe',
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove event source %s', self.arn)
disable = remove
def status(self, function):
LOG.debug('status for SNS notification for %s', function.name)
status = self.exists(function)
if status:
status['EventSourceArn'] = status['TopicArn']
return status
......
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kappa.event_source.base
import logging
LOG = logging.getLogger(__name__)
class S3EventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
self._s3 = kappa.awsclient.create_client('s3', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def _get_bucket_name(self):
return self.arn.split(':')[-1]
def add(self, function):
notification_spec = {
'LambdaFunctionConfigurations': [
{
'Id': self._make_notification_id(function.name),
'Events': [e for e in self._config['events']],
'LambdaFunctionArn': function.arn,
}
]
}
try:
response = self._s3.call(
'put_bucket_notification_configuration',
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
except Exception as exc:
LOG.debug(exc.response)
LOG.exception('Unable to add S3 event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing s3 notification')
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' in response:
fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
if fn_arn == function.arn:
del response['CloudFunctionConfiguration']
del response['ResponseMetadata']
response = self._s3.call(
'put_bucket_notification',
Bucket=self._get_bucket_name(),
NotificationConfiguration=response)
LOG.debug(response)
disable = remove
def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' not in response:
response = None
return response
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kappa.awsclient
import kappa.event_source.base
import logging
LOG = logging.getLogger(__name__)
class SNSEventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(SNSEventSource, self).__init__(context, config)
self._sns = kappa.awsclient.create_client('sns', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def exists(self, function):
try:
response = self._sns.call(
'list_subscriptions_by_topic',
TopicArn=self.arn)
LOG.debug(response)
for subscription in response['Subscriptions']:
if subscription['Endpoint'] == function.arn:
return subscription
return None
except Exception:
LOG.exception('Unable to find event source %s', self.arn)
def add(self, function):
try:
response = self._sns.call(
'subscribe',
TopicArn=self.arn, Protocol='lambda',
Endpoint=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add SNS event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing SNS event source')
try:
subscription = self.exists(function)
if subscription:
response = self._sns.call(
'unsubscribe',
SubscriptionArn=subscription['SubscriptionArn'])
LOG.debug(response)
except Exception:
LOG.exception('Unable to remove event source %s', self.arn)
disable = remove
def status(self, function):
LOG.debug('status for SNS notification for %s', function.name)
status = self.exists(function)
if status:
status['EventSourceArn'] = status['TopicArn']
return status