Jose Diaz-Gonzalez

simplify event source retrieval

...@@ -22,10 +22,10 @@ import sys ...@@ -22,10 +22,10 @@ import sys
22 22
23 import kappa.function 23 import kappa.function
24 import kappa.restapi 24 import kappa.restapi
25 -import kappa.event_source.dynamodb_stream as dynamodb_stream 25 +import kappa.event_source.dynamodb_stream
26 -import kappa.event_source.kinesis as kinesis 26 +import kappa.event_source.kinesis
27 -import kappa.event_source.s3 as s3 27 +import kappa.event_source.s3
28 -import kappa.event_source.sns as sns 28 +import kappa.event_source.sns
29 import kappa.policy 29 import kappa.policy
30 import kappa.role 30 import kappa.role
31 import kappa.awsclient 31 import kappa.awsclient
...@@ -168,27 +168,27 @@ class Context(object): ...@@ -168,27 +168,27 @@ class Context(object):
168 168
169 def _create_event_sources(self): 169 def _create_event_sources(self):
170 env_cfg = self.config['environments'][self.environment] 170 env_cfg = self.config['environments'][self.environment]
171 - if 'event_sources' in env_cfg: 171 + if 'event_sources' not in env_cfg:
172 + return
173 +
172 event_sources = env_cfg.get('event_sources', {}) 174 event_sources = env_cfg.get('event_sources', {})
173 if not event_sources: 175 if not event_sources:
174 return 176 return
177 +
178 + event_source_map = {
179 + 'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource,
180 + 'kinesis': kappa.event_source.kinesis.KinesisEventSource,
181 + 's3': kappa.event_source.s3.S3EventSource,
182 + 'sns': kappa.event_source.sns.SNSEventSource,
183 + }
175 for event_source_cfg in event_sources: 184 for event_source_cfg in event_sources:
176 _, _, svc, _ = event_source_cfg['arn'].split(':', 3) 185 _, _, svc, _ = event_source_cfg['arn'].split(':', 3)
177 - if svc == 'kinesis': 186 + event_source = event_source_map.get(svc, None)
178 - self.event_sources.append( 187 + if not event_source:
179 - kinesis.KinesisEventSource(self, event_source_cfg)) 188 + raise ValueError('Unknown event source: {0}'.format(
180 - elif svc == 's3': 189 + event_source_cfg['arn']))
181 self.event_sources.append( 190 self.event_sources.append(
182 - s3.S3EventSource(self, event_source_cfg)) 191 + event_source(self, event_source_cfg))
183 - elif svc == 'sns':
184 - self.event_sources.append(
185 - sns.SNSEventSource(self, event_source_cfg))
186 - elif svc == 'dynamodb':
187 - self.event_sources.append(
188 - dynamodb_stream.DynamoDBStreamEventSource(self, event_source_cfg))
189 - else:
190 - msg = 'Unknown event source: %s' % event_source_cfg['arn']
191 - raise ValueError(msg)
192 192
193 def add_event_sources(self): 193 def add_event_sources(self):
194 for event_source in self.event_sources: 194 for event_source in self.event_sources:
......