s3.py
3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Mitch Garnaat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kappa.event_source.base
import logging
LOG = logging.getLogger(__name__)
class S3EventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(S3EventSource, self).__init__(context, config)
self._s3 = kappa.awsclient.create_client('s3', context.session)
def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
def _get_bucket_name(self):
return self.arn.split(':')[-1]
def add(self, function):
notification_spec = {
'LambdaFunctionConfigurations': [
{
'Id': self._make_notification_id(function.name),
'Events': [e for e in self._config['events']],
'LambdaFunctionArn': '%s:%s' % (function.arn, function._context.environment),
}
]
}
# Add S3 key filters
if 'key_filters' in self._config:
filters_spec = { 'Key' : { 'FilterRules' : [] } }
for filter in self._config['key_filters']:
if 'type' in filter and 'value' in filter and filter['type'] in ('prefix', 'suffix'):
rule = { 'Name' : filter['type'], 'Value' : filter['value'] }
filters_spec['Key']['FilterRules'].append(rule)
notification_spec['LambdaFunctionConfigurations'][0]['Filter'] = filters_spec
try:
response = self._s3.call(
'put_bucket_notification_configuration',
Bucket=self._get_bucket_name(),
NotificationConfiguration=notification_spec)
LOG.debug(response)
except Exception as exc:
LOG.debug(exc.response)
LOG.exception('Unable to add S3 event source')
enable = add
def update(self, function):
self.add(function)
def remove(self, function):
LOG.debug('removing s3 notification')
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' in response:
fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
if fn_arn == function.arn:
del response['CloudFunctionConfiguration']
del response['ResponseMetadata']
response = self._s3.call(
'put_bucket_notification',
Bucket=self._get_bucket_name(),
NotificationConfiguration=response)
LOG.debug(response)
disable = remove
def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
response = self._s3.call(
'get_bucket_notification',
Bucket=self._get_bucket_name())
LOG.debug(response)
if 'CloudFunctionConfiguration' not in response:
return None
return {
'EventSourceArn': response['CloudFunctionConfiguration']['CloudFunction'],
'State': 'Enabled'
}