Mitch Garnaat

Another WIP commit. Major changes in the CLI. Also much better detection of ch…

…anges (or no changes) in the code, configuration, policies, etc. when deploying.  An attempt to incorporate a test runner that will run unit tests associated with the Lambda function.
# Copyright (c) 2014,2015 Mitch Garnaat http://garnaat.org/
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import boto3
class __AWS(object):
def __init__(self, profile_name=None, region_name=None):
self._client_cache = {}
self._session = boto3.session.Session(
region_name=region_name, profile_name=profile_name)
def create_client(self, client_name):
if client_name not in self._client_cache:
self._client_cache[client_name] = self._session.client(
client_name)
return self._client_cache[client_name]
__Singleton_AWS = None
def get_aws(context):
global __Singleton_AWS
if __Singleton_AWS is None:
__Singleton_AWS = __AWS(context.profile, context.region)
return __Singleton_AWS
......@@ -13,33 +13,22 @@
# limitations under the License.
import logging
import json
import os
import datetime
import jmespath
import boto3
import placebo
LOG = logging.getLogger(__name__)
def json_encoder(obj):
"""JSON encoder that formats datetimes as ISO8601 format."""
if isinstance(obj, datetime.datetime):
return obj.isoformat()
else:
return obj
class AWSClient(object):
def __init__(self, service_name, region_name, profile_name,
record_path=None):
def __init__(self, service_name, region_name, profile_name):
self._service_name = service_name
self._region_name = region_name
self._profile_name = profile_name
self._record_path = record_path
self._client = self._create_client()
@property
......@@ -54,43 +43,12 @@ class AWSClient(object):
def profile_name(self):
return self._profile_name
def _record(self, op_name, kwargs, data):
"""
This is a little hack to enable easier unit testing of the code.
Since botocore/boto3 has its own set of tests, I'm not interested in
trying to test it again here. So, this recording capability allows
us to save the data coming back from botocore as JSON files which
can then be used by the mocked awsclient in the unit test directory.
To enable this, pass in a record_path to the contructor and the JSON
data files will get stored in this path.
"""
if self._record_path:
path = os.path.expanduser(self._record_path)
path = os.path.expandvars(path)
path = os.path.join(path, self.service_name)
if not os.path.isdir(path):
os.mkdir(path)
path = os.path.join(path, self.region_name)
if not os.path.isdir(path):
os.mkdir(path)
path = os.path.join(path, self.account_id)
if not os.path.isdir(path):
os.mkdir(path)
filename = op_name
if kwargs:
for k, v in kwargs.items():
if k != 'query':
filename += '_{}_{}'.format(k, v)
filename += '.json'
path = os.path.join(path, filename)
with open(path, 'wb') as fp:
json.dump(data, fp, indent=4, default=json_encoder,
ensure_ascii=False)
def _create_client(self):
session = boto3.session.Session(
region_name=self._region_name, profile_name=self._profile_name)
return session.client(self._service_name)
placebo.attach(session)
client = session.client(self._service_name)
return client
def call(self, op_name, query=None, **kwargs):
"""
......@@ -131,20 +89,36 @@ class AWSClient(object):
data = op(**kwargs)
if query:
data = query.search(data)
self._record(op_name, kwargs, data)
return data
_client_cache = {}
def save_recordings(recording_path):
for key in _client_cache:
client = _client_cache[key]
full_path = os.path.join(recording_path, '{}.json'.format(key))
client._client.meta.placebo.save(full_path)
def create_client(service_name, context):
global _client_cache
client_key = '{}:{}:{}'.format(service_name, context.region,
context.profile)
if client_key not in _client_cache:
_client_cache[client_key] = AWSClient(service_name,
context.region,
context.profile,
context.record_path)
client = AWSClient(service_name, context.region,
context.profile)
if 'placebo' in context.config:
placebo_cfg = context.config['placebo']
if placebo_cfg.get('mode') == 'play':
full_path = os.path.join(
placebo_cfg['recording_path'],
'{}.json'.format(client_key))
if os.path.exists(full_path):
client._client.meta.placebo.load(full_path)
client._client.meta.placebo.start()
elif placebo_cfg['mode'] == 'record':
client._client.meta.placebo.record()
_client_cache[client_key] = client
return _client_cache[client_key]
......
......@@ -16,6 +16,7 @@ import logging
import yaml
import time
import os
import shutil
import kappa.function
import kappa.event_source
......@@ -57,26 +58,28 @@ class Context(object):
with open(cache_file, 'rb') as fp:
self.cache = yaml.load(fp)
def save_cache(self):
def _delete_cache(self):
if os.path.isdir('.kappa'):
shutil.rmtree('.kappa')
self.cache = {}
def _save_cache(self):
if not os.path.isdir('.kappa'):
os.mkdir('.kappa')
cache_file = os.path.join('.kappa', 'cache')
with open(cache_file, 'wb') as fp:
yaml.dump(self.cache, fp)
@property
def name(self):
return '{}-{}-v{}'.format(self.base_name,
self.environment,
self.version)
def get_cache_value(self, key):
return self.cache.setdefault(self.environment, dict()).get(key)
@property
def base_name(self):
return self.config.get('base_name')
def set_cache_value(self, key, value):
self.cache.setdefault(self.environment, dict())[key] = value
self._save_cache()
@property
def version(self):
return self.config.get('version')
def name(self):
return '{}-{}'.format(self.config['name'], self.environment)
@property
def profile(self):
......@@ -87,14 +90,23 @@ class Context(object):
return self.config['environments'][self.environment]['region']
@property
def record_path(self):
return self.config.get('record_path')
def record(self):
return self.config.get('record', False)
@property
def lambda_config(self):
return self.config.get('lambda')
@property
def test_dir(self):
return self.config.get('tests', '_tests')
@property
def unit_test_runner(self):
return self.config.get('unit_test_runner',
'nosetests . ../_tests/unit/')
@property
def exec_role_arn(self):
return self.role.arn
......@@ -179,14 +191,20 @@ class Context(object):
time.sleep(5)
self.function.deploy()
def update_code(self):
self.function.update()
def invoke(self, data):
return self.function.invoke(data)
def invoke(self):
return self.function.invoke()
def unit_tests(self):
# run any unit tests
unit_test_path = os.path.join(self.test_dir, 'unit')
if os.path.exists(unit_test_path):
os.chdir(self.function.path)
print('running unit tests')
pipe = os.popen(self.unit_test_runner, 'r')
print(pipe.read())
def test(self):
return self.function.invoke()
return self.unit_tests()
def dryrun(self):
return self.function.dryrun()
......@@ -197,6 +215,9 @@ class Context(object):
def tail(self):
return self.function.tail()
def tag(self, name, description):
return self.function.tag(name, description)
def delete(self):
for event_source in self.event_sources:
event_source.remove(self.function)
......@@ -208,6 +229,7 @@ class Context(object):
time.sleep(5)
if self.policy:
self.policy.delete()
self._delete_cache()
def status(self):
status = {}
......
......@@ -17,6 +17,7 @@ import os
import zipfile
import time
import shutil
import hashlib
from botocore.exceptions import ClientError
......@@ -66,11 +67,11 @@ class Function(object):
@property
def path(self):
return self._config['path']
return self._config.get('path', '_src')
@property
def test_data(self):
return self._config['test_data']
def tests(self):
return self._config.get('tests', '_tests')
@property
def permissions(self):
......@@ -87,34 +88,79 @@ class Function(object):
LOG.debug('Unable to find ARN for function: %s', self.name)
return self._response
@property
def code_sha_256(self):
def _get_response_configuration(self, key, default=None):
value = None
response = self._get_response()
return response['Configuration']['CodeSha256']
if response:
if 'Configuration' in response:
value = response['Configuration'].get(key, default)
return value
def _get_response_code(self, key, default=None):
value = None
response = self._get_response
if response:
if 'Configuration' in response:
value = response['Configuration'].get(key, default)
return value
@property
def arn(self):
response = self._get_response()
return response['Configuration']['FunctionArn']
def code_sha_256(self):
return self._get_response_configuration('CodeSha256')
@property
def version(self):
response = self._get_response()
return response['Configuration']['Version']
def arn(self):
return self._get_response_configuration('FunctionArn')
@property
def repository_type(self):
response = self._get_response()
return response['Code']['RepositoryType']
return self._get_response_code('RepositoryType')
@property
def location(self):
response = self._get_response()
return response['Code']['Location']
return self._get_response_code('Location')
@property
def version(self):
return self._get_response_configuration('Version')
def exists(self):
return self._get_response()
def _check_function_md5(self):
changed = True
self._copy_config_file()
self.zip_lambda_function(self.zipfile_name, self.path)
m = hashlib.md5()
with open(self.zipfile_name, 'rb') as fp:
m.update(fp.read())
zip_md5 = m.hexdigest()
cached_md5 = self._context.get_cache_value('zip_md5')
LOG.debug('zip_md5: %s', zip_md5)
LOG.debug('cached md5: %s', cached_md5)
if zip_md5 != cached_md5:
self._context.set_cache_value('zip_md5', zip_md5)
else:
changed = False
LOG.info('function unchanged')
return changed
def _check_config_md5(self):
m = hashlib.md5()
m.update(self.description)
m.update(self.handler)
m.update(str(self.memory_size))
m.update(self._context.exec_role_arn)
m.update(str(self.timeout))
config_md5 = m.hexdigest()
cached_md5 = self._context.get_cache_value('config_md5')
if config_md5 != cached_md5:
self._context.set_cache_value('config_md5', config_md5)
changed = True
else:
changed = False
return changed
@property
def log(self):
if self._log is None:
......@@ -181,13 +227,13 @@ class Function(object):
config_path = os.path.join(self.path, config_name)
if os.path.exists(config_path):
dest_path = os.path.join(self.path, 'config.json')
LOG.info('copy %s to %s', config_path, dest_path)
shutil.copyfile(config_path, dest_path)
LOG.debug('copy %s to %s', config_path, dest_path)
shutil.copy2(config_path, dest_path)
def create(self):
LOG.info('creating function %s', self.name)
self._copy_config_file()
self.zip_lambda_function(self.zipfile_name, self.path)
self._check_function_md5()
self._check_config_md5()
with open(self.zipfile_name, 'rb') as fp:
exec_role = self._context.exec_role_arn
LOG.debug('exec_role=%s', exec_role)
......@@ -202,29 +248,17 @@ class Function(object):
Handler=self.handler,
Description=self.description,
Timeout=self.timeout,
MemorySize=self.memory_size)
MemorySize=self.memory_size,
Publish=True)
LOG.debug(response)
except Exception:
LOG.exception('Unable to upload zip file')
self.add_permissions()
def _do_update(self):
do_update = False
if self._context.force:
do_update = True
else:
stats = os.stat(self.zipfile_name)
if self._context.cache.get('zipfile_size') != stats.st_size:
self._context.cache['zipfile_size'] = stats.st_size
do_update = True
return do_update
def update(self):
LOG.info('updating %s', self.name)
self._copy_config_file()
self.zip_lambda_function(self.zipfile_name, self.path)
if self._do_update():
self._context.save_cache()
if self._check_function_md5():
self._response = None
with open(self.zipfile_name, 'rb') as fp:
try:
LOG.info('uploading new function zipfile %s',
......@@ -233,33 +267,40 @@ class Function(object):
response = self._lambda_client.call(
'update_function_code',
FunctionName=self.name,
ZipFile=zipdata)
ZipFile=zipdata,
Publish=True)
LOG.debug(response)
except Exception:
LOG.exception('unable to update zip file')
def update_configuration(self):
if self._check_config_md5():
self._response = None
LOG.info('updating configuration for %s', self.name)
exec_role = self._context.exec_role_arn
LOG.debug('exec_role=%s', exec_role)
try:
response = self._lambda_client.call(
'update_function_configuration',
FunctionName=self.name,
Role=exec_role,
Handler=self.handler,
Description=self.description,
Timeout=self.timeout,
MemorySize=self.memory_size)
LOG.debug(response)
except Exception:
LOG.exception('unable to update function configuration')
else:
LOG.info('function has not changed')
LOG.info('function configuration has not changed')
def deploy(self):
if self.exists():
self.update_configuration()
return self.update()
return self.create()
def publish_version(self, description):
LOG.info('publishing version of %s', self.name)
try:
response = self._lambda_client.call(
'publish_version',
FunctionName=self.name,
CodeSha256=self.code_sha_256,
Description=description)
LOG.debug(response)
except Exception:
LOG.exception('Unable to publish version')
return response['Version']
def list_versions(self):
LOG.info('listing versions of %s', self.name)
try:
response = self._lambda_client.call(
'list_versions_by_function',
......@@ -270,15 +311,26 @@ class Function(object):
return response['Versions']
def create_alias(self, name, description, version=None):
LOG.info('creating alias of %s', self.name)
if version is None:
version = self.version
# Find the current (latest) version by version number
# First find the SHA256 of $LATEST
if not version:
versions = self.list_versions()
for version in versions:
if version['Version'] == '$LATEST':
latest_sha256 = version['CodeSha256']
break
for version in versions:
if version['Version'] != '$LATEST':
if version['CodeSha256'] == latest_sha256:
version = version['Version']
break
try:
LOG.info('creating alias %s=%s', name, version)
response = self._lambda_client.call(
'create_alias',
FunctionName=self.name,
Description=description,
FunctionVersion=self.version,
FunctionVersion=version,
Name=name)
LOG.debug(response)
except Exception:
......@@ -297,8 +349,7 @@ class Function(object):
return response['Versions']
def tag(self, name, description):
version = self.publish_version(description)
self.create_alias(name, description, version)
self.create_alias(name, description)
def delete(self):
LOG.info('deleting function %s', self.name)
......@@ -332,17 +383,14 @@ class Function(object):
InvokeArgs=fp)
LOG.debug(response)
def _invoke(self, test_data, invocation_type):
if test_data is None:
test_data = self.test_data
LOG.debug('invoke %s', test_data)
with open(test_data) as fp:
response = self._lambda_client.call(
'invoke',
FunctionName=self.name,
InvocationType=invocation_type,
LogType='Tail',
Payload=fp.read())
def _invoke(self, data, invocation_type):
LOG.debug('invoke %s as %s', self.name, invocation_type)
response = self._lambda_client.call(
'invoke',
FunctionName=self.name,
InvocationType=invocation_type,
LogType='Tail',
Payload=data)
LOG.debug(response)
return response
......
......@@ -131,11 +131,11 @@ class Policy(object):
m = hashlib.md5()
m.update(document)
policy_md5 = m.hexdigest()
cached_md5 = self._context.get_cache_value('policy_md5')
LOG.debug('policy_md5: %s', policy_md5)
LOG.debug('cache md5: %s', self._context.cache.get('policy_md5'))
if policy_md5 != self._context.cache.get('policy_md5'):
self._context.cache['policy_md5'] = policy_md5
self._context.save_cache()
LOG.debug('cached md5: %s', cached_md5)
if policy_md5 != cached_md5:
self._context.set_cache_value('policy_md5', policy_md5)
self._add_policy_version()
else:
LOG.info('policy unchanged')
......@@ -158,6 +158,20 @@ class Policy(object):
document = self.document()
if self.arn and document:
LOG.info('deleting policy %s', self.name)
LOG.info('deleting all policy versions for %s', self.name)
versions = self._list_versions()
for version in versions:
LOG.debug('deleting version %s', version['VersionId'])
if not version['IsDefaultVersion']:
try:
response = self._iam_client.call(
'delete_policy_version',
PolicyArn=self.arn,
VersionId=version['VersionId'])
except Exception:
LOG.exception('Unable to delete policy version %s',
version['VersionId'])
LOG.debug('now delete policy')
response = self._iam_client.call(
'delete_policy', PolicyArn=self.arn)
LOG.debug(response)
......
......@@ -19,12 +19,16 @@ import click
from kappa.context import Context
pass_ctx = click.make_pass_decorator(Context)
@click.group()
@click.argument(
'config',
@click.option(
'--config',
default='kappa.yml',
type=click.File('rb'),
envvar='KAPPA_CONFIG',
help='Name of config file (default is kappa.yml)'
)
@click.option(
'--debug/--no-debug',
......@@ -32,117 +36,63 @@ from kappa.context import Context
help='Turn on debugging output'
)
@click.option(
'--environment',
help='Specify which environment to work with'
)
@click.option(
'--force/--no-force',
default=False,
help='Force an update of the Lambda function'
'--env',
default='dev',
help='Specify which environment to work with (default dev)'
)
@click.pass_context
def cli(ctx, config=None, debug=False, environment=None, force=None):
config = config
ctx.obj['debug'] = debug
ctx.obj['config'] = config
ctx.obj['environment'] = environment
ctx.obj['force'] = force
def cli(ctx, config=None, debug=False, env=None):
ctx.obj = Context(config, env, debug)
@cli.command()
@click.pass_context
@pass_ctx
def deploy(ctx):
"""Deploy the Lambda function and any policies and roles required"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('deploying')
context.deploy()
click.echo('done')
@cli.command()
@click.pass_context
def tag(ctx):
"""Deploy the Lambda function and any policies and roles required"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('tagging')
context.deploy()
ctx.deploy()
click.echo('done')
@cli.command()
@click.pass_context
def invoke(ctx):
@click.argument('data_file', type=click.File('r'))
@pass_ctx
def invoke(ctx, data_file):
"""Invoke the command synchronously"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('invoking')
response = context.invoke()
response = ctx.invoke(data_file.read())
log_data = base64.b64decode(response['LogResult'])
click.echo(log_data)
click.echo('Response:')
click.echo(response['Payload'].read())
click.echo('done')
@cli.command()
@click.pass_context
@pass_ctx
def test(ctx):
"""Test the command synchronously"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('testing')
response = context.test()
log_data = base64.b64decode(response['LogResult'])
click.echo(log_data)
click.echo(response['Payload'].read())
click.echo('done')
@cli.command()
@click.pass_context
def dryrun(ctx):
"""Show you what would happen but don't actually do anything"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('invoking dryrun')
response = context.dryrun()
click.echo(response)
click.echo('done')
@cli.command()
@click.pass_context
def invoke_async(ctx):
"""Invoke the Lambda function asynchronously"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('invoking async')
response = context.invoke_async()
click.echo(response)
ctx.test()
click.echo('done')
@cli.command()
@click.pass_context
@pass_ctx
def tail(ctx):
"""Show the last 10 lines of the log file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('tailing logs')
for e in context.tail()[-10:]:
for e in ctx.tail()[-10:]:
ts = datetime.utcfromtimestamp(e['timestamp']//1000).isoformat()
click.echo("{}: {}".format(ts, e['message']))
click.echo('done')
@cli.command()
@click.pass_context
@pass_ctx
def status(ctx):
"""Print a status of this Lambda function"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'])
status = context.status()
status = ctx.status()
click.echo(click.style('Policy', bold=True))
if status['policy']:
line = ' {} ({})'.format(
......@@ -175,58 +125,46 @@ def status(ctx):
@cli.command()
@click.pass_context
@pass_ctx
def delete(ctx):
"""Delete the Lambda function and related policies and roles"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('deleting')
context.delete()
ctx.delete()
click.echo('done')
@cli.command()
@click.pass_context
def add_event_sources(ctx):
@click.option(
'--command',
type=click.Choice(['add', 'update', 'enable', 'disable']),
help='Operation to perform on event sources')
@pass_ctx
def event_sources(ctx, command):
"""Add any event sources specified in the config file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('adding event sources')
context.add_event_sources()
click.echo('done')
if command == 'add':
click.echo('adding event sources')
ctx.add_event_sources()
click.echo('done')
elif command == 'update':
click.echo('updating event sources')
ctx.update_event_sources()
click.echo('done')
elif command == 'enable':
click.echo('enabling event sources')
ctx.enable_event_sources()
click.echo('done')
elif command == 'disable':
click.echo('enabling event sources')
ctx.disable_event_sources()
click.echo('done')
@cli.command()
@click.pass_context
def update_event_sources(ctx):
"""Update event sources specified in the config file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('updating event sources')
context.update_event_sources()
@click.argument('name')
@click.argument('description')
@pass_ctx
def tag(ctx, name, description):
"""Tag the current function version with a symbolic name"""
click.echo('creating tag for function')
ctx.tag(name, description)
click.echo('done')
@cli.command()
@click.pass_context
def enable_event_sources(ctx):
"""Enable event sources specified in the config file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('enabling event sources')
context.enable_event_sources()
click.echo('done')
@cli.command()
@click.pass_context
def disable_event_sources(ctx):
"""Disable event sources specified in the config file"""
context = Context(ctx.obj['config'], ctx.obj['environment'],
ctx.obj['debug'], ctx.obj['force'])
click.echo('enabling event sources')
context.disable_event_sources()
click.echo('done')
cli(obj={})
......
boto3>=1.2.0
boto3>=1.2.2
click==5.1
PyYAML>=3.11
mock>=1.0.1
......
A Simple Python Example
=======================
In this Python example, we will build a Lambda function that can be hooked up
to methods in API Gateway to provide a simple CRUD REST API that persists JSON
objects in DynamoDB.
To implement this, we will create a single Lambda function that will be
associated with the GET, POST, PUT, and DELETE HTTP methods of a single API
Gateway resource. We will show the API Gateway connections later. For now, we
will focus on our Lambda function.
Installing Dependencies
-----------------------
Put all dependencies in the `requirements.txt` file in this directory and then
run the following command to install them in this directory prior to uploading
the code.
$ pip install -r requirements.txt -t /full/path/to/this/code
This will install all of the dependencies inside the code directory so they can
be bundled with your own code and deployed to Lambda.
The ``setup.cfg`` file in this directory is required if you are running on
MacOS and are using brew. It may not be needed on other platforms.
The Code Is Here!
=================
At the moment, the contents of this directory are created by hand but when
LambdaPI is complete, the basic framework would be created for you. You would
have a Python source file that works but doesn't actually do anything. And the
config.json file here would be created on the fly at deployment time. The
correct resource names and other variables would be written into the config
file and then then config file would get bundled up with the code. You can
then load the config file at run time in the Lambda Python code so you don't
have to hardcode resource names in your code.
Installing Dependencies
-----------------------
Put all dependencies in the `requirements.txt` file in this directory and then
run the following command to install them in this directory prior to uploading
the code.
$ pip install -r requirements.txt -t /full/path/to/this/code
This will install all of the dependencies inside the code directory so they can
be bundled with your own code and deployed to Lambda.
The ``setup.cfg`` file in this directory is required if you are running on
MacOS and are using brew. It may not be needed on other platforms.
{
"region_name": "us-west-2",
"sample_table": "kappa-python-sample"
}
{
"region_name": "us-west-2",
"sample_table": "kappa-python-sample"
}
git+ssh://git@github.com/garnaat/petard.git
import logging
import json
import uuid
import boto3
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
# The kappa deploy command will make sure that the right config file
# for this environment is available in the local directory.
config = json.load(open('config.json'))
session = boto3.Session(region_name=config['region_name'])
ddb_client = session.resource('dynamodb')
table = ddb_client.Table(config['sample_table'])
def foobar():
return 42
def _get(event, context):
customer_id = event.get('id')
if customer_id is None:
raise Exception('No id provided for GET operation')
response = table.get_item(Key={'id': customer_id})
item = response.get('Item')
if item is None:
raise Exception('id: {} not found'.format(customer_id))
return response['Item']
def _post(event, context):
item = event['json_body']
if item is None:
raise Exception('No json_body found in event')
item['id'] = str(uuid.uuid4())
table.put_item(Item=item)
return item
def _put(event, context):
data = _get(event, context)
id = data.get('id')
data.update(event['json_body'])
# don't allow the id to be changed
data['id'] = id
table.put_item(Item=data)
return data
def handler(event, context):
LOG.info(event)
http_method = event.get('http_method')
if not http_method:
return 'NoHttpMethodSupplied'
if http_method == 'GET':
return _get(event, context)
elif http_method == 'POST':
return _post(event, context)
elif http_method == 'PUT':
return _put(event, context)
elif http_method == 'DELETE':
return _put(event, context)
else:
raise Exception('UnsupportedMethod: {}'.format(http_method))
{
"http_method": "GET",
"id": "4a407fc2-da7a-41e9-8dc6-8a057b6b767a"
}
{
"http_method": "POST",
"json_body": {
"foo": "This is the foo value",
"bar": "This is the bar value"
}
}
import unittest
import simple
class TestSimple(unittest.TestCase):
def test_foobar(self):
self.assertEqual(simple.foobar(), 42)
---
name: kappa-python-sample
environments:
dev:
profile: <your dev profile>
region: <your dev region e.g. us-west-2>
policy:
resources:
- arn: arn:aws:dynamodb:us-west-2:123456789012:table/kappa-python-sample
actions:
- "*"
- arn: arn:aws:logs:*:*:*
actions:
- "*"
prod:
profile: <your prod profile>
region: <your prod region e.g. us-west-2>
policy_resources:
- arn: arn:aws:dynamodb:us-west-2:234567890123:table/kappa-python-sample
actions:
- "*"
- arn: arn:aws:logs:*:*:*
actions:
- "*"
lambda:
description: A simple Python sample
handler: simple.handler
runtime: python2.7
memory_size: 256
timeout: 3
\ No newline at end of file
......@@ -5,7 +5,7 @@ from setuptools import setup, find_packages
import os
requires = [
'boto3>=1.2.0',
'boto3>=1.2.2',
'click>=5.0',
'PyYAML>=3.11'
]
......