Mitch Garnaat

More WIP changes to get current with GA release of Lambda.

......@@ -65,6 +65,24 @@ def invoke(ctx):
@cli.command()
@click.pass_context
def dryrun(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('invoking dryrun...')
response = context.dryrun()
click.echo(response)
click.echo('...done')
@cli.command()
@click.pass_context
def invoke_async(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('invoking async...')
response = context.invoke_async()
click.echo(response)
click.echo('...done')
@cli.command()
@click.pass_context
def tail(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('tailing logs...')
......@@ -124,6 +142,14 @@ def add_event_sources(ctx):
context.add_event_sources()
click.echo('...done')
@cli.command()
@click.pass_context
def update_event_sources(ctx):
context = Context(ctx.obj['config'], ctx.obj['debug'])
click.echo('updating event sources...')
context.update_event_sources()
click.echo('...done')
if __name__ == '__main__':
cli(obj={})
......
......@@ -122,6 +122,10 @@ class Context(object):
for event_source in self.event_sources:
event_source.add(self.function)
def update_event_sources(self):
for event_source in self.event_sources:
event_source.update(self.function)
def create(self):
if self.policy:
self.policy.create()
......@@ -140,6 +144,12 @@ class Context(object):
def invoke(self):
return self.function.invoke()
def dryrun(self):
return self.function.dryrun()
def invoke_async(self):
return self.function.invoke_async()
def tail(self):
return self.function.tail()
......
......@@ -38,6 +38,10 @@ class EventSource(object):
def batch_size(self):
return self._config.get('batch_size', 100)
@property
def enabled(self):
return self._config.get('enabled', True)
class KinesisEventSource(EventSource):
......@@ -62,10 +66,25 @@ class KinesisEventSource(EventSource):
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position)
StartingPosition=self.starting_position,
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add Kinesis event source')
LOG.exception('Unable to add event source')
def update(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.update_event_source_mapping(
BatchSize=self.batch_size,
Enabled=self.enabled,
FunctionName=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update event source')
def remove(self, function):
response = None
......
......@@ -207,14 +207,24 @@ class Function(object):
InvokeArgs=fp)
LOG.debug(response)
def invoke(self, test_data=None):
def _invoke(self, test_data, invocation_type):
if test_data is None:
test_data = self.test_data
LOG.debug('invoke %s', test_data)
with open(test_data) as fp:
response = self._lambda_svc.invoke(
FunctionName=self.name,
InvocationType=invocation_type,
LogType='Tail',
Payload=fp.read())
LOG.debug(response)
return response
def invoke(self, test_data=None):
return self._invoke(test_data, 'RequestResponse')
def invoke_async(self, test_data=None):
return self._invoke(test_data, 'Event')
def dryrun(self, test_data=None):
return self._invoke(test_data, 'DryRun')
......