James Cooper

Remove event_source.py - accidentally re-added when I rebased

1 -# Copyright (c) 2014, 2015 Mitch Garnaat
2 -#
3 -# Licensed under the Apache License, Version 2.0 (the "License");
4 -# you may not use this file except in compliance with the License.
5 -# You may obtain a copy of the License at
6 -#
7 -# http://www.apache.org/licenses/LICENSE-2.0
8 -#
9 -# Unless required by applicable law or agreed to in writing, software
10 -# distributed under the License is distributed on an "AS IS" BASIS,
11 -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 -# See the License for the specific language governing permissions and
13 -# limitations under the License.
14 -
15 -import logging
16 -import uuid
17 -
18 -from botocore.exceptions import ClientError
19 -
20 -import kappa.awsclient
21 -
22 -LOG = logging.getLogger(__name__)
23 -
24 -
25 -class EventSource(object):
26 -
27 - def __init__(self, context, config):
28 - self._context = context
29 - self._config = config
30 -
31 - @property
32 - def arn(self):
33 - return self._config['arn']
34 -
35 - @property
36 - def starting_position(self):
37 - return self._config.get('starting_position', 'LATEST')
38 -
39 - @property
40 - def batch_size(self):
41 - return self._config.get('batch_size', 100)
42 -
43 - @property
44 - def enabled(self):
45 - return self._config.get('enabled', False)
46 -
47 -
48 -class KinesisEventSource(EventSource):
49 -
50 - def __init__(self, context, config):
51 - super(KinesisEventSource, self).__init__(context, config)
52 - self._lambda = kappa.awsclient.create_client(
53 - 'lambda', context.session)
54 -
55 - def _get_uuid(self, function):
56 - uuid = None
57 - response = self._lambda.call(
58 - 'list_event_source_mappings',
59 - FunctionName=function.name,
60 - EventSourceArn=self.arn)
61 - LOG.debug(response)
62 - if len(response['EventSourceMappings']) > 0:
63 - uuid = response['EventSourceMappings'][0]['UUID']
64 - return uuid
65 -
66 - def add(self, function):
67 - try:
68 - response = self._lambda.call(
69 - 'create_event_source_mapping',
70 - FunctionName=function.name,
71 - EventSourceArn=self.arn,
72 - BatchSize=self.batch_size,
73 - StartingPosition=self.starting_position,
74 - Enabled=self.enabled
75 - )
76 - LOG.debug(response)
77 - except Exception:
78 - LOG.exception('Unable to add event source')
79 -
80 - def enable(self, function):
81 - self._config['enabled'] = True
82 - try:
83 - response = self._lambda.call(
84 - 'update_event_source_mapping',
85 - UUID=self._get_uuid(function),
86 - Enabled=self.enabled
87 - )
88 - LOG.debug(response)
89 - except Exception:
90 - LOG.exception('Unable to enable event source')
91 -
92 - def disable(self, function):
93 - self._config['enabled'] = False
94 - try:
95 - response = self._lambda.call(
96 - 'update_event_source_mapping',
97 - FunctionName=function.name,
98 - Enabled=self.enabled
99 - )
100 - LOG.debug(response)
101 - except Exception:
102 - LOG.exception('Unable to disable event source')
103 -
104 - def update(self, function):
105 - response = None
106 - uuid = self._get_uuid(function)
107 - if uuid:
108 - try:
109 - response = self._lambda.call(
110 - 'update_event_source_mapping',
111 - BatchSize=self.batch_size,
112 - Enabled=self.enabled,
113 - FunctionName=function.arn)
114 - LOG.debug(response)
115 - except Exception:
116 - LOG.exception('Unable to update event source')
117 -
118 - def remove(self, function):
119 - response = None
120 - uuid = self._get_uuid(function)
121 - if uuid:
122 - response = self._lambda.call(
123 - 'delete_event_source_mapping',
124 - UUID=uuid)
125 - LOG.debug(response)
126 - return response
127 -
128 - def status(self, function):
129 - response = None
130 - LOG.debug('getting status for event source %s', self.arn)
131 - uuid = self._get_uuid(function)
132 - if uuid:
133 - try:
134 - response = self._lambda.call(
135 - 'get_event_source_mapping',
136 - UUID=self._get_uuid(function))
137 - LOG.debug(response)
138 - except ClientError:
139 - LOG.debug('event source %s does not exist', self.arn)
140 - response = None
141 - else:
142 - LOG.debug('No UUID for event source %s', self.arn)
143 - return response
144 -
145 -
146 -class DynamoDBStreamEventSource(KinesisEventSource):
147 -
148 - pass
149 -
150 -
151 -class S3EventSource(EventSource):
152 -
153 - def __init__(self, context, config):
154 - super(S3EventSource, self).__init__(context, config)
155 - self._s3 = kappa.awsclient.create_client('s3', context.session)
156 -
157 - def _make_notification_id(self, function_name):
158 - return 'Kappa-%s-notification' % function_name
159 -
160 - def _get_bucket_name(self):
161 - return self.arn.split(':')[-1]
162 -
163 - def add(self, function):
164 - notification_spec = {
165 - 'LambdaFunctionConfigurations': [
166 - {
167 - 'Id': self._make_notification_id(function.name),
168 - 'Events': [e for e in self._config['events']],
169 - 'LambdaFunctionArn': function.arn,
170 - }
171 - ]
172 - }
173 - try:
174 - response = self._s3.call(
175 - 'put_bucket_notification_configuration',
176 - Bucket=self._get_bucket_name(),
177 - NotificationConfiguration=notification_spec)
178 - LOG.debug(response)
179 - except Exception as exc:
180 - LOG.debug(exc.response)
181 - LOG.exception('Unable to add S3 event source')
182 -
183 - enable = add
184 -
185 - def update(self, function):
186 - self.add(function)
187 -
188 - def remove(self, function):
189 - LOG.debug('removing s3 notification')
190 - response = self._s3.call(
191 - 'get_bucket_notification',
192 - Bucket=self._get_bucket_name())
193 - LOG.debug(response)
194 - if 'CloudFunctionConfiguration' in response:
195 - fn_arn = response['CloudFunctionConfiguration']['CloudFunction']
196 - if fn_arn == function.arn:
197 - del response['CloudFunctionConfiguration']
198 - del response['ResponseMetadata']
199 - response = self._s3.call(
200 - 'put_bucket_notification',
201 - Bucket=self._get_bucket_name(),
202 - NotificationConfiguration=response)
203 - LOG.debug(response)
204 -
205 - disable = remove
206 -
207 - def status(self, function):
208 - LOG.debug('status for s3 notification for %s', function.name)
209 - response = self._s3.call(
210 - 'get_bucket_notification',
211 - Bucket=self._get_bucket_name())
212 - LOG.debug(response)
213 - if 'CloudFunctionConfiguration' not in response:
214 - response = None
215 - return response
216 -
217 -
218 -class SNSEventSource(EventSource):
219 -
220 - def __init__(self, context, config):
221 - super(SNSEventSource, self).__init__(context, config)
222 - self._sns = kappa.awsclient.create_client('sns', context.session)
223 -
224 - def _make_notification_id(self, function_name):
225 - return 'Kappa-%s-notification' % function_name
226 -
227 - def exists(self, function):
228 - try:
229 - response = self._sns.call(
230 - 'list_subscriptions_by_topic',
231 - TopicArn=self.arn)
232 - LOG.debug(response)
233 - for subscription in response['Subscriptions']:
234 - if subscription['Endpoint'] == function.arn:
235 - return subscription
236 - return None
237 - except Exception:
238 - LOG.exception('Unable to find event source %s', self.arn)
239 -
240 - def add(self, function):
241 - try:
242 - response = self._sns.call(
243 - 'subscribe',
244 - TopicArn=self.arn, Protocol='lambda',
245 - Endpoint=function.arn)
246 - LOG.debug(response)
247 - except Exception:
248 - LOG.exception('Unable to add SNS event source')
249 -
250 - enable = add
251 -
252 - def update(self, function):
253 - self.add(function)
254 -
255 - def remove(self, function):
256 - LOG.debug('removing SNS event source')
257 - try:
258 - subscription = self.exists(function)
259 - if subscription:
260 - response = self._sns.call(
261 - 'unsubscribe',
262 - SubscriptionArn=subscription['SubscriptionArn'])
263 - LOG.debug(response)
264 - except Exception:
265 - LOG.exception('Unable to remove event source %s', self.arn)
266 -
267 - disable = remove
268 -
269 - def status(self, function):
270 - LOG.debug('status for SNS notification for %s', function.name)
271 - status = self.exists(function)
272 - if status:
273 - status['EventSourceArn'] = status['TopicArn']
274 - return status
275 -
276 -class CloudWatchEventSource(EventSource):
277 -
278 - def __init__(self, context, config):
279 - super(CloudWatchEventSource, self).__init__(context, config)
280 - self._events = kappa.awsclient.create_client('events', context.session)
281 - self._lambda = kappa.awsclient.create_client('lambda', context.session)
282 - self._name = config['name']
283 - self._context = context
284 - self._config = config
285 -
286 - def find(self):
287 - response = self._events.call('list_rules', NamePrefix=self._name)
288 - LOG.debug(response)
289 - if 'Rules' in response:
290 - for r in response['Rules']:
291 - if r['Name'] == self._name:
292 - return r
293 - return None
294 -
295 - def add(self, function):
296 - kwargs = {
297 - 'Name': self._name,
298 - 'State': 'ENABLED' if self.enabled else 'DISABLED'
299 - }
300 - if 'schedule' in self._config:
301 - kwargs['ScheduleExpression'] = self._config['schedule']
302 - if 'pattern' in self._config:
303 - kwargs['EventPattern'] = self._config['pattern']
304 - if 'description' in self._config:
305 - kwargs['Description'] = self._config['description']
306 - if 'role_arn' in self._config:
307 - kwargs['RoleArn'] = self._config['role_arn']
308 - try:
309 - response = self._events.call('put_rule', **kwargs)
310 - LOG.debug(response)
311 - self._config['arn'] = response['RuleArn']
312 - self._lambda.call('add_permission',
313 - FunctionName=function.name,
314 - StatementId=str(uuid.uuid4()),
315 - Action='lambda:InvokeFunction',
316 - Principal='events.amazonaws.com',
317 - SourceArn=response['RuleArn'])
318 - response = self._events.call('put_targets',
319 - Rule=self._name,
320 - Targets=[{
321 - 'Id': '1',
322 - 'Arn': function.arn
323 - }])
324 - LOG.debug(response)
325 - except Exception:
326 - LOG.exception('Unable to put CloudWatch event source')
327 -
328 - def update(self, function):
329 - self.add(function)
330 -
331 - def remove(self, function):
332 - LOG.debug('removing CloudWatch event source')
333 - try:
334 - rule = self.find()
335 - if rule:
336 - response = self._events.call(
337 - 'delete_rule',
338 - Name=self._name)
339 - LOG.debug(response)
340 - except Exception:
341 - LOG.exception('Unable to remove CloudWatch event source %s', self._name)
342 -
343 - def status(self, function):
344 - LOG.debug('status for CloudWatch event for %s', function.name)
345 - return self._to_status(self.find())
346 -
347 - def enable(self, function):
348 - if self.find():
349 - self._events.call('enable_rule', Name=self._name)
350 -
351 - def disable(self, function):
352 - if self.find():
353 - self._events.call('disable_rule', Name=self._name)
354 -
355 - def _to_status(self, rule):
356 - if rule:
357 - return {
358 - 'EventSourceArn': rule['Arn'],
359 - 'State': rule['State']
360 - }
361 - else:
362 - return None