Mitch Garnaat

Another WIP commit on the road to an update for the new Lambda API.

......@@ -41,12 +41,20 @@ def cli(ctx, config=None, debug=False):
@click.pass_context
def create(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('deploying...')
click.echo('creating...')
context.create()
click.echo('...done')
@cli.command()
@click.pass_context
def update_code(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('updating code...')
context.update_code()
click.echo('...done')
@cli.command()
@click.pass_context
def invoke(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('invoking...')
......@@ -93,6 +101,7 @@ def status(ctx):
click.echo(click.style('Event Sources', bold=True))
if status['event_sources']:
for event_source in status['event_sources']:
if event_source:
line = ' {}: {}'.format(
event_source['EventSourceArn'], event_source['State'])
click.echo(click.style(line, fg='green'))
......
......@@ -13,6 +13,7 @@
import logging
import yaml
import time
import kappa.function
import kappa.event_source
......@@ -107,8 +108,12 @@ class Context(object):
self, event_source_cfg))
elif svc == 'sns':
self.event_sources.append(
kappa.event_source.SNSEventSource(self,
event_source_cfg))
kappa.event_source.SNSEventSource(
self, event_source_cfg))
elif svc == 'dynamodb':
self.event_sources.append(
kappa.event_source.DynamoDBStreamEventSource(
self, event_source_cfg))
else:
msg = 'Unknown event source: %s' % event_source_cfg['arn']
raise ValueError(msg)
......@@ -122,8 +127,16 @@ class Context(object):
self.policy.create()
if self.role:
self.role.create()
# There is a consistency problem here.
# If you don't wait for a bit, the function.create call
# will fail because the policy has not been attached to the role.
LOG.debug('Waiting for policy/role propogation')
time.sleep(5)
self.function.create()
def update_code(self):
self.function.update()
def invoke(self):
return self.function.invoke()
......@@ -131,13 +144,15 @@ class Context(object):
return self.function.tail()
def delete(self):
if self.policy:
self.policy.delete()
if self.role:
self.role.delete()
self.function.delete()
for event_source in self.event_sources:
event_source.remove(self.function)
self.function.delete()
time.sleep(5)
if self.role:
self.role.delete()
time.sleep(5)
if self.policy:
self.policy.delete()
def status(self):
status = {}
......
......@@ -77,7 +77,10 @@ class KinesisEventSource(EventSource):
return response
def status(self, function):
response = None
LOG.debug('getting status for event source %s', self.arn)
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.get_event_source_mapping(
UUID=self._get_uuid(function))
......@@ -85,9 +88,16 @@ class KinesisEventSource(EventSource):
except ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
else:
LOG.debug('No UUID for event source %s', self.arn)
return response
class DynamoDBStreamEventSource(KinesisEventSource):
pass
class S3EventSource(EventSource):
def __init__(self, context, config):
......
......@@ -148,6 +148,7 @@ class Function(object):
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
exec_role = self._context.exec_role_arn
LOG.debug('exec_role=%s', exec_role)
try:
zipdata = fp.read()
response = self._lambda_svc.create_function(
......@@ -164,10 +165,27 @@ class Function(object):
LOG.exception('Unable to upload zip file')
self.add_permissions()
def update(self):
LOG.debug('updating %s', self.zipfile_name)
self.zip_lambda_function(self.zipfile_name, self.path)
with open(self.zipfile_name, 'rb') as fp:
try:
zipdata = fp.read()
response = self._lambda_svc.update_function_code(
FunctionName=self.name,
ZipFile=zipdata)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update zip file')
def delete(self):
LOG.debug('deleting function %s', self.name)
response = None
try:
response = self._lambda_svc.delete_function(FunctionName=self.name)
LOG.debug(response)
except ClientError:
LOG.debug('function %s: not found', self.name)
return response
def status(self):
......
......@@ -77,6 +77,8 @@ class Policy(object):
LOG.exception('Error creating Policy')
def delete(self):
response = None
if self.arn:
LOG.debug('deleting policy %s', self.name)
response = self._iam_svc.delete_policy(PolicyArn=self.arn)
LOG.debug(response)
......
......@@ -79,13 +79,28 @@ class Role(object):
Path=self.Path, RoleName=self.name,
AssumeRolePolicyDocument=AssumeRolePolicyDocument)
LOG.debug(response)
except Exception:
if self._context.policy:
response = self._iam_svc.attach_role_policy(
RoleName=self.name,
PolicyArn=self._context.policy.arn)
LOG.debug(response)
except ClientError:
LOG.exception('Error creating Role')
def delete(self):
response = None
LOG.debug('deleting role %s', self.name)
try:
LOG.debug('First detach the policy from the role')
policy_arn = self._context.policy.arn
if policy_arn:
response = self._iam_svc.detach_role_policy(
RoleName=self.name, PolicyArn=policy_arn)
LOG.debug(response)
response = self._iam_svc.delete_role(RoleName=self.name)
LOG.debug(response)
except ClientError:
LOG.exception('role %s not found', self.name)
return response
def status(self):
......