Mitch Garnaat

Getting event sources working again. Lots of other changes.

......@@ -2,4 +2,4 @@ include README.md
include LICENSE
include requirements.txt
include kappa/_version
recursive-include samples *.js *.yml *.cf *.json
recursive-include samples *.js *.py *.yml *.cf *.json *.txt
......
......@@ -38,6 +38,10 @@ class AWSClient(object):
def session(self):
return self._session
@property
def region_name(self):
return self.client.meta.region_name
def _create_client(self):
client = self._session.client(self._service_name)
return client
......
......@@ -19,6 +19,7 @@ import os
import shutil
import kappa.function
import kappa.restapi
import kappa.event_source
import kappa.policy
import kappa.role
......@@ -55,6 +56,11 @@ class Context(object):
self, self.config['environments'][self.environment])
self.function = kappa.function.Function(
self, self.config['lambda'])
if 'restapi' in self.config:
self.restapi = kappa.restapi.RestApi(
self, self.config['restapi'])
else:
self.restapi = None
self.event_sources = []
self._create_event_sources()
......@@ -82,7 +88,8 @@ class Context(object):
return self.cache.setdefault(self.environment, dict()).get(key)
def set_cache_value(self, key, value):
self.cache.setdefault(self.environment, dict())[key] = value.encode('utf-8')
self.cache.setdefault(
self.environment, dict())[key] = value.encode('utf-8')
self._save_cache()
@property
......@@ -149,8 +156,9 @@ class Context(object):
log.addHandler(ch)
def _create_event_sources(self):
if 'event_sources' in self.config['lambda']:
for event_source_cfg in self.config['lambda']['event_sources']:
env_cfg = self.config['environments'][self.environment]
if 'event_sources' in env_cfg:
for event_source_cfg in env_cfg['event_sources']:
_, _, svc, _ = event_source_cfg['arn'].split(':', 3)
if svc == 'kinesis':
self.event_sources.append(
......@@ -179,6 +187,14 @@ class Context(object):
for event_source in self.event_sources:
event_source.update(self.function)
def enable_event_sources(self):
for event_source in self.event_sources:
event_source.enable(self.function)
def disable_event_sources(self):
for event_source in self.event_sources:
event_source.enable(self.function)
def create(self):
if self.policy:
self.policy.create()
......@@ -197,6 +213,8 @@ class Context(object):
if self.role:
self.role.create()
self.function.deploy()
if self.restapi:
self.restapi.deploy()
def invoke(self, data):
return self.function.invoke(data)
......@@ -227,6 +245,8 @@ class Context(object):
event_source.remove(self.function)
self.function.log.delete()
self.function.delete()
if self.restapi:
self.restapi.delete()
time.sleep(5)
if self.role:
self.role.delete()
......
......@@ -49,7 +49,7 @@ class KinesisEventSource(EventSource):
def __init__(self, context, config):
super(KinesisEventSource, self).__init__(context, config)
self._lambda = kappa.awsclient.create_client(
'kinesis', context.session)
'lambda', context.session)
def _get_uuid(self, function):
uuid = None
......@@ -77,7 +77,7 @@ class KinesisEventSource(EventSource):
LOG.exception('Unable to add event source')
def enable(self, function):
self.enabled = True
self._config['enabled'] = True
try:
response = self._lambda.call(
'update_event_source_mapping',
......@@ -89,7 +89,7 @@ class KinesisEventSource(EventSource):
LOG.exception('Unable to enable event source')
def disable(self, function):
self.enabled = False
self._config['enabled'] = False
try:
response = self._lambda.call(
'update_event_source_mapping',
......
......@@ -18,6 +18,7 @@ import zipfile
import time
import shutil
import hashlib
import uuid
from botocore.exceptions import ClientError
......@@ -89,6 +90,10 @@ class Function(object):
return self._get_response_configuration('FunctionArn')
@property
def alias_arn(self):
return self.arn + ':{}'.format(self._context.environment)
@property
def repository_type(self):
return self._get_response_code('RepositoryType')
......@@ -100,6 +105,12 @@ class Function(object):
def version(self):
return self._get_response_configuration('Version')
@property
def deployment_uri(self):
return 'https://{}.execute-api.{}.amazonaws.com/{}'.format(
self.api_id, self._apigateway_client.region_name,
self._context.environment)
def _get_response(self):
if self._response is None:
try:
......@@ -217,12 +228,11 @@ class Function(object):
try:
response = self._lambda_client.call(
'list_aliases',
FunctionName=self.name,
FunctionVersion=self.version)
FunctionName=self.name)
LOG.debug(response)
except Exception:
LOG.exception('Unable to list aliases')
return response['Versions']
return response.get('Versions', list())
def find_latest_version(self):
# Find the current (latest) version by version number
......@@ -271,20 +281,17 @@ class Function(object):
except Exception:
LOG.exception('Unable to update alias')
def add_permissions(self):
if self.permissions:
time.sleep(5)
for permission in self.permissions:
def add_permission(self, action, principal,
source_arn=None, source_account=None):
try:
kwargs = {
'FunctionName': self.name,
'StatementId': permission['statement_id'],
'Action': permission['action'],
'Principal': permission['principal']}
source_arn = permission.get('source_arn', None)
'Qualifier': self._context.environment,
'StatementId': str(uuid.uuid4()),
'Action': action,
'Principal': principal}
if source_arn:
kwargs['SourceArn'] = source_arn
source_account = permission.get('source_account', None)
if source_account:
kwargs['SourceAccount'] = source_account
response = self._lambda_client.call(
......@@ -293,6 +300,16 @@ class Function(object):
except Exception:
LOG.exception('Unable to add permission')
def add_permissions(self):
if self.permissions:
time.sleep(5)
for permission in self.permissions:
self.add_permission(
permission['action'],
permission['principal'],
permission.get('source_arn'),
permission.get('source_account'))
def create(self):
LOG.info('creating function %s', self.name)
self._check_function_md5()
......@@ -415,15 +432,6 @@ class Function(object):
response = None
return response
def invoke_asynch(self, data_file):
LOG.debug('_invoke_async %s', data_file)
with open(data_file) as fp:
response = self._lambda_client.call(
'invoke_async',
FunctionName=self.name,
InvokeArgs=fp)
LOG.debug(response)
def _invoke(self, data, invocation_type):
LOG.debug('invoke %s as %s', self.name, invocation_type)
response = self._lambda_client.call(
......
......@@ -89,7 +89,8 @@ class Policy(object):
PolicyArn=self.arn)
except Exception:
LOG.exception('Error listing policy versions')
return response['Versions']
response = {}
return response.get('Versions', list())
def exists(self):
for policy in self._find_all_policies():
......