#!/usr/bin/python3 # Copyright (C) 2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import copy import json import logging import os import sys import uuid import bottle class Muck: def __init__(self): self._objs = {} def __len__(self): return len(self._objs) def new_id(self): return str(uuid.uuid4()) def create(self, obj): obj_id = self.new_id() self._objs[obj_id] = obj def update(self, obj_id, obj): self._objs[obj_id] = obj def show(self, obj_id): return self._objs.get(obj_id) def delete(self, obj_id): if obj_id in self._objs: del self._objs[obj_id] def search(self): return copy.deepcopy(self._objs) class API: def __init__(self, bottleapp): self._add_routes(bottleapp) self._muck = Muck() def _add_routes(self, bottleapp): routes = [ { 'method': 'GET', 'path': '/status', 'callback': self._show_status, }, { 'method': 'GET', 'path': '/search', 'callback': self._search, }, { 'method': 'POST', 'path': '/mem', 'callback': self._create, }, ] for route in routes: bottleapp.route(**route) def _show_status(self): status = { 'resources': len(self._muck), } return response(200, status) def _search(self): result = { 'resources': self._muck.search(), } return response(200, result) def _create(self): r = bottle.request if r.content_type != 'application/json': return response(400) obj = bottle.request.json logging.info('CREATE %r', repr(obj)) self._muck.create(obj) return response(201, obj) def response(status, body): headers = {} if isinstance(body, dict): headers['Content-Type'] = 'application/json' return bottle.HTTPResponse( status=status, body=json.dumps(body), headers=headers) with open(sys.argv[1]) as f: config = json.load(f) logging.basicConfig( filename=config['log'], level=logging.DEBUG, format='%(levelname)s %(message)s') logging.info('Effi API starts') if config.get('pid'): pid = os.getpid() with open(config['pid'], 'w') as f: f.write(str(pid)) app = bottle.default_app() api = API(app) app.run(host='127.0.0.1', port=8080)