Jose Diaz-Gonzalez

refactor event sources into their own modules

...@@ -21,7 +21,10 @@ import shutil ...@@ -21,7 +21,10 @@ import shutil
21 21
22 import kappa.function 22 import kappa.function
23 import kappa.restapi 23 import kappa.restapi
24 -import kappa.event_source 24 +import kappa.event_source.dynamodb_stream as dynamodb_stream
25 +import kappa.event_source.kinesis as kinesis
26 +import kappa.event_source.s3 as s3
27 +import kappa.event_source.sns as sns
25 import kappa.policy 28 import kappa.policy
26 import kappa.role 29 import kappa.role
27 import kappa.awsclient 30 import kappa.awsclient
...@@ -159,23 +162,23 @@ class Context(object): ...@@ -159,23 +162,23 @@ class Context(object):
159 def _create_event_sources(self): 162 def _create_event_sources(self):
160 env_cfg = self.config['environments'][self.environment] 163 env_cfg = self.config['environments'][self.environment]
161 if 'event_sources' in env_cfg: 164 if 'event_sources' in env_cfg:
162 - for event_source_cfg in env_cfg['event_sources']: 165 + event_sources = env_cfg.get('event_sources', {})
166 + if not event_sources:
167 + return
168 + for event_source_cfg in event_sources:
163 _, _, svc, _ = event_source_cfg['arn'].split(':', 3) 169 _, _, svc, _ = event_source_cfg['arn'].split(':', 3)
164 if svc == 'kinesis': 170 if svc == 'kinesis':
165 self.event_sources.append( 171 self.event_sources.append(
166 - kappa.event_source.KinesisEventSource( 172 + kinesis.KinesisEventSource(self, event_source_cfg))
167 - self, event_source_cfg))
168 elif svc == 's3': 173 elif svc == 's3':
169 - self.event_sources.append(kappa.event_source.S3EventSource( 174 + self.event_sources.append(
170 - self, event_source_cfg)) 175 + s3.S3EventSource(self, event_source_cfg))
171 elif svc == 'sns': 176 elif svc == 'sns':
172 self.event_sources.append( 177 self.event_sources.append(
173 - kappa.event_source.SNSEventSource( 178 + sns.SNSEventSource(self, event_source_cfg))
174 - self, event_source_cfg))
175 elif svc == 'dynamodb': 179 elif svc == 'dynamodb':
176 self.event_sources.append( 180 self.event_sources.append(
177 - kappa.event_source.DynamoDBStreamEventSource( 181 + dynamodb_stream.DynamoDBStreamEventSource(self, event_source_cfg))
178 - self, event_source_cfg))
179 else: 182 else:
180 msg = 'Unknown event source: %s' % event_source_cfg['arn'] 183 msg = 'Unknown event source: %s' % event_source_cfg['arn']
181 raise ValueError(msg) 184 raise ValueError(msg)
......
1 +# -*- coding: utf-8 -*-
2 +# Copyright (c) 2014, 2015 Mitch Garnaat
3 +#
4 +# Licensed under the Apache License, Version 2.0 (the "License");
5 +# you may not use this file except in compliance with the License.
6 +# You may obtain a copy of the License at
7 +#
8 +# http://www.apache.org/licenses/LICENSE-2.0
9 +#
10 +# Unless required by applicable law or agreed to in writing, software
11 +# distributed under the License is distributed on an "AS IS" BASIS,
12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 +# See the License for the specific language governing permissions and
14 +# limitations under the License.
1 +# -*- coding: utf-8 -*-
2 +# Copyright (c) 2014, 2015 Mitch Garnaat
3 +#
4 +# Licensed under the Apache License, Version 2.0 (the "License");
5 +# you may not use this file except in compliance with the License.
6 +# You may obtain a copy of the License at
7 +#
8 +# http://www.apache.org/licenses/LICENSE-2.0
9 +#
10 +# Unless required by applicable law or agreed to in writing, software
11 +# distributed under the License is distributed on an "AS IS" BASIS,
12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 +# See the License for the specific language governing permissions and
14 +# limitations under the License.
15 +
16 +
17 +class EventSource(object):
18 +
19 + def __init__(self, context, config):
20 + self._context = context
21 + self._config = config
22 +
23 + @property
24 + def arn(self):
25 + return self._config['arn']
26 +
27 + @property
28 + def starting_position(self):
29 + return self._config.get('starting_position', 'LATEST')
30 +
31 + @property
32 + def batch_size(self):
33 + return self._config.get('batch_size', 100)
34 +
35 + @property
36 + def enabled(self):
37 + return self._config.get('enabled', False)
1 +# -*- coding: utf-8 -*-
2 +# Copyright (c) 2014, 2015 Mitch Garnaat
3 +#
4 +# Licensed under the Apache License, Version 2.0 (the "License");
5 +# you may not use this file except in compliance with the License.
6 +# You may obtain a copy of the License at
7 +#
8 +# http://www.apache.org/licenses/LICENSE-2.0
9 +#
10 +# Unless required by applicable law or agreed to in writing, software
11 +# distributed under the License is distributed on an "AS IS" BASIS,
12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 +# See the License for the specific language governing permissions and
14 +# limitations under the License.
15 +
16 +import kappa.event_source.kinesis
17 +
18 +
19 +class DynamoDBStreamEventSource(kappa.event_source.kinesis.KinesisEventSource):
20 +
21 + pass
...@@ -13,39 +13,14 @@ ...@@ -13,39 +13,14 @@
13 # See the License for the specific language governing permissions and 13 # See the License for the specific language governing permissions and
14 # limitations under the License. 14 # limitations under the License.
15 15
16 +import botocore.exceptions
17 +import kappa.event_source.base
16 import logging 18 import logging
17 19
18 -from botocore.exceptions import ClientError
19 -
20 -import kappa.awsclient
21 -
22 LOG = logging.getLogger(__name__) 20 LOG = logging.getLogger(__name__)
23 21
24 22
25 -class EventSource(object): 23 +class KinesisEventSource(kappa.event_source.base.EventSource):
26 -
27 - def __init__(self, context, config):
28 - self._context = context
29 - self._config = config
30 -
31 - @property
32 - def arn(self):
33 - return self._config['arn']
34 -
35 - @property
36 - def starting_position(self):
37 - return self._config.get('starting_position', 'LATEST')
38 -
39 - @property
40 - def batch_size(self):
41 - return self._config.get('batch_size', 100)
42 -
43 - @property
44 - def enabled(self):
45 - return self._config.get('enabled', False)
46 -
47 -
48 -class KinesisEventSource(EventSource):
49 24
50 def __init__(self, context, config): 25 def __init__(self, context, config):
51 super(KinesisEventSource, self).__init__(context, config) 26 super(KinesisEventSource, self).__init__(context, config)
...@@ -135,140 +110,9 @@ class KinesisEventSource(EventSource): ...@@ -135,140 +110,9 @@ class KinesisEventSource(EventSource):
135 'get_event_source_mapping', 110 'get_event_source_mapping',
136 UUID=self._get_uuid(function)) 111 UUID=self._get_uuid(function))
137 LOG.debug(response) 112 LOG.debug(response)
138 - except ClientError: 113 + except botocore.exceptions.ClientError:
139 LOG.debug('event source %s does not exist', self.arn) 114 LOG.debug('event source %s does not exist', self.arn)
140 response = None 115 response = None
141 else: 116 else:
142 LOG.debug('No UUID for event source %s', self.arn) 117 LOG.debug('No UUID for event source %s', self.arn)
143 return response 118 return response
144 -
145 -
146 -class DynamoDBStreamEventSource(KinesisEventSource):
147 -
148 - pass
149 -
150 -
151 -class S3EventSource(EventSource):
152 -
153 - def __init__(self, context, config):
154 - super(S3EventSource, self).__init__(context, config)
155 - self._s3 = kappa.awsclient.create_client('s3', context.session)
156 -
157 - def _make_notification_id(self, function_name):
158 - return 'Kappa-%s-notification' % function_name
159 -
160 - def _get_bucket_name(self):
161 - return self.arn.split(':')[-1]
162 -
163 - def add(self, function):
164 - notification_spec = {
165 - 'LambdaFunctionConfigurations': [
166 - {
167 - 'Id': self._make_notification_id(function.name),
168 - 'Events': [e for e in self._config['events']],
169 - 'LambdaFunctionArn': function.arn,
170 - }
171 - ]
172 - }
173 - try:
174 - response = self._s3.call(
175 - 'put_bucket_notification_configuration',
176 - Bucket=self._get_bucket_name(),
177 - NotificationConfiguration=notification_spec)
178 - LOG.debug(response)
179 - except Exception as exc:
180 - LOG.debug(exc.response)
181 - LOG.exception('Unable to add S3 event source')
182 -
183 - enable = add
184 -
185 - def update(self, function):
186 - self.add(function)
187 -
188 - def remove(self, function):
189 - LOG.debug('removing s3 notification')
190 - response = self._s3.call(
191 - 'get_bucket_notification',
192 - Bucket=self._get_bucket_name())
193 - LOG.debug(response)
194 - if 'CloudFunctionConfiguration' in response:
195 - fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
196 - if fn_arn == function.arn:
197 - del response['CloudFunctionConfiguration']
198 - del response['ResponseMetadata']
199 - response = self._s3.call(
200 - 'put_bucket_notification',
201 - Bucket=self._get_bucket_name(),
202 - NotificationConfiguration=response)
203 - LOG.debug(response)
204 -
205 - disable = remove
206 -
207 - def status(self, function):
208 - LOG.debug('status for s3 notification for %s', function.name)
209 - response = self._s3.call(
210 - 'get_bucket_notification',
211 - Bucket=self._get_bucket_name())
212 - LOG.debug(response)
213 - if 'CloudFunctionConfiguration' not in response:
214 - response = None
215 - return response
216 -
217 -
218 -class SNSEventSource(EventSource):
219 -
220 - def __init__(self, context, config):
221 - super(SNSEventSource, self).__init__(context, config)
222 - self._sns = kappa.awsclient.create_client('sns', context.session)
223 -
224 - def _make_notification_id(self, function_name):
225 - return 'Kappa-%s-notification' % function_name
226 -
227 - def exists(self, function):
228 - try:
229 - response = self._sns.call(
230 - 'list_subscriptions_by_topic',
231 - TopicArn=self.arn)
232 - LOG.debug(response)
233 - for subscription in response['Subscriptions']:
234 - if subscription['Endpoint'] == function.arn:
235 - return subscription
236 - return None
237 - except Exception:
238 - LOG.exception('Unable to find event source %s', self.arn)
239 -
240 - def add(self, function):
241 - try:
242 - response = self._sns.call(
243 - 'subscribe',
244 - TopicArn=self.arn, Protocol='lambda',
245 - Endpoint=function.arn)
246 - LOG.debug(response)
247 - except Exception:
248 - LOG.exception('Unable to add SNS event source')
249 -
250 - enable = add
251 -
252 - def update(self, function):
253 - self.add(function)
254 -
255 - def remove(self, function):
256 - LOG.debug('removing SNS event source')
257 - try:
258 - subscription = self.exists(function)
259 - if subscription:
260 - response = self._sns.call(
261 - 'unsubscribe',
262 - SubscriptionArn=subscription['SubscriptionArn'])
263 - LOG.debug(response)
264 - except Exception:
265 - LOG.exception('Unable to remove event source %s', self.arn)
266 -
267 - disable = remove
268 -
269 - def status(self, function):
270 - LOG.debug('status for SNS notification for %s', function.name)
271 - status = self.exists(function)
272 - if status:
273 - status['EventSourceArn'] = status['TopicArn']
274 - return status
......
1 +# -*- coding: utf-8 -*-
2 +# Copyright (c) 2014, 2015 Mitch Garnaat
3 +#
4 +# Licensed under the Apache License, Version 2.0 (the "License");
5 +# you may not use this file except in compliance with the License.
6 +# You may obtain a copy of the License at
7 +#
8 +# http://www.apache.org/licenses/LICENSE-2.0
9 +#
10 +# Unless required by applicable law or agreed to in writing, software
11 +# distributed under the License is distributed on an "AS IS" BASIS,
12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 +# See the License for the specific language governing permissions and
14 +# limitations under the License.
15 +
16 +import kappa.event_source.base
17 +import logging
18 +
19 +LOG = logging.getLogger(__name__)
20 +
21 +
22 +class S3EventSource(kappa.event_source.base.EventSource):
23 +
24 + def __init__(self, context, config):
25 + super(S3EventSource, self).__init__(context, config)
26 + self._s3 = kappa.awsclient.create_client('s3', context.session)
27 +
28 + def _make_notification_id(self, function_name):
29 + return 'Kappa-%s-notification' % function_name
30 +
31 + def _get_bucket_name(self):
32 + return self.arn.split(':')[-1]
33 +
34 + def add(self, function):
35 + notification_spec = {
36 + 'LambdaFunctionConfigurations': [
37 + {
38 + 'Id': self._make_notification_id(function.name),
39 + 'Events': [e for e in self._config['events']],
40 + 'LambdaFunctionArn': function.arn,
41 + }
42 + ]
43 + }
44 + try:
45 + response = self._s3.call(
46 + 'put_bucket_notification_configuration',
47 + Bucket=self._get_bucket_name(),
48 + NotificationConfiguration=notification_spec)
49 + LOG.debug(response)
50 + except Exception as exc:
51 + LOG.debug(exc.response)
52 + LOG.exception('Unable to add S3 event source')
53 +
54 + enable = add
55 +
56 + def update(self, function):
57 + self.add(function)
58 +
59 + def remove(self, function):
60 + LOG.debug('removing s3 notification')
61 + response = self._s3.call(
62 + 'get_bucket_notification',
63 + Bucket=self._get_bucket_name())
64 + LOG.debug(response)
65 + if 'CloudFunctionConfiguration' in response:
66 + fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
67 + if fn_arn == function.arn:
68 + del response['CloudFunctionConfiguration']
69 + del response['ResponseMetadata']
70 + response = self._s3.call(
71 + 'put_bucket_notification',
72 + Bucket=self._get_bucket_name(),
73 + NotificationConfiguration=response)
74 + LOG.debug(response)
75 +
76 + disable = remove
77 +
78 + def status(self, function):
79 + LOG.debug('status for s3 notification for %s', function.name)
80 + response = self._s3.call(
81 + 'get_bucket_notification',
82 + Bucket=self._get_bucket_name())
83 + LOG.debug(response)
84 + if 'CloudFunctionConfiguration' not in response:
85 + response = None
86 + return response
1 +# -*- coding: utf-8 -*-
2 +# Copyright (c) 2014, 2015 Mitch Garnaat
3 +#
4 +# Licensed under the Apache License, Version 2.0 (the "License");
5 +# you may not use this file except in compliance with the License.
6 +# You may obtain a copy of the License at
7 +#
8 +# http://www.apache.org/licenses/LICENSE-2.0
9 +#
10 +# Unless required by applicable law or agreed to in writing, software
11 +# distributed under the License is distributed on an "AS IS" BASIS,
12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 +# See the License for the specific language governing permissions and
14 +# limitations under the License.
15 +
16 +import kappa.awsclient
17 +import kappa.event_source.base
18 +import logging
19 +
20 +LOG = logging.getLogger(__name__)
21 +
22 +
23 +class SNSEventSource(kappa.event_source.base.EventSource):
24 +
25 + def __init__(self, context, config):
26 + super(SNSEventSource, self).__init__(context, config)
27 + self._sns = kappa.awsclient.create_client('sns', context.session)
28 +
29 + def _make_notification_id(self, function_name):
30 + return 'Kappa-%s-notification' % function_name
31 +
32 + def exists(self, function):
33 + try:
34 + response = self._sns.call(
35 + 'list_subscriptions_by_topic',
36 + TopicArn=self.arn)
37 + LOG.debug(response)
38 + for subscription in response['Subscriptions']:
39 + if subscription['Endpoint'] == function.arn:
40 + return subscription
41 + return None
42 + except Exception:
43 + LOG.exception('Unable to find event source %s', self.arn)
44 +
45 + def add(self, function):
46 + try:
47 + response = self._sns.call(
48 + 'subscribe',
49 + TopicArn=self.arn, Protocol='lambda',
50 + Endpoint=function.arn)
51 + LOG.debug(response)
52 + except Exception:
53 + LOG.exception('Unable to add SNS event source')
54 +
55 + enable = add
56 +
57 + def update(self, function):
58 + self.add(function)
59 +
60 + def remove(self, function):
61 + LOG.debug('removing SNS event source')
62 + try:
63 + subscription = self.exists(function)
64 + if subscription:
65 + response = self._sns.call(
66 + 'unsubscribe',
67 + SubscriptionArn=subscription['SubscriptionArn'])
68 + LOG.debug(response)
69 + except Exception:
70 + LOG.exception('Unable to remove event source %s', self.arn)
71 +
72 + disable = remove
73 +
74 + def status(self, function):
75 + LOG.debug('status for SNS notification for %s', function.name)
76 + status = self.exists(function)
77 + if status:
78 + status['EventSourceArn'] = status['TopicArn']
79 + return status