| import json |
| import sqlite3 |
| from pathlib import Path |
| from datetime import datetime, timezone |
| from typing import NamedTuple, Iterable, Iterator, List |
| from functools import wraps |
| from flask import Flask, render_template, abort, g, \ |
| redirect, request, url_for, make_response, send_file, \ |
| Response |
| from base64 import b64decode |
| |
| |
| from spaceauth import SpaceAuth, login_required, current_user, cap_required |
| |
| # device infomation stored in database |
| class DeviceInfo(NamedTuple): |
| hwaddr: str |
| name: str |
| owner: str |
| ignored: bool |
| |
| def v4addr(): |
| if request.headers.getlist("X-Forwarded-For"): |
| r_addr = request.headers.getlist("X-Forwarded-For")[-1] |
| else: |
| r_addr = request.remote_addr |
| if r_addr.startswith('::ffff:'): |
| r_addr = r_addr[7:] |
| return r_addr |
| |
| |
| def req_to_ctx(): |
| return dict(iter(request.form.items())) |
| |
| def get_device_info(conn: sqlite3.Connection, hwaddr: str) -> DeviceInfo: |
| return list(get_device_infos(conn, (hwaddr,)))[0] |
| |
| |
| def get_device_infos(conn: sqlite3.Connection, hwaddrs: Iterable[str]) -> Iterator[DeviceInfo]: |
| hwaddrs = list(hwaddrs) |
| in_clause = '({})'.format(', '.join(['?'] * len(hwaddrs))) |
| stmt = '''select hwaddr, name, ignored, owner from |
| devices where devices.hwaddr in ''' + in_clause |
| for row in conn.execute(stmt, hwaddrs): |
| owner = row['owner'] or '' |
| ignored = row['ignored'] |
| yield DeviceInfo(row['hwaddr'], row['name'], owner, ignored) |
| |
| def app(instance_path, devices_api, config): |
| app = Flask('at', instance_path=instance_path, instance_relative_config=True) |
| app.config.update(config) |
| app.jinja_env.add_extension('jinja2.ext.i18n') |
| app.jinja_env.install_null_translations() |
| app.updater = devices_api |
| |
| if app.config.get('PROXY_FIX'): |
| from werkzeug.middleware.proxy_fix import ProxyFix |
| app.wsgi_app = ProxyFix(app.wsgi_app) |
| |
| app.space_auth = SpaceAuth(app) |
| |
| |
| def auth_get_user(): |
| if config.get('DEBUG', False): |
| if "User" in request.headers: |
| return request.headers.get("User") |
| if "Authorization" in request.headers: |
| raw = b64decode(request.headers.get('Authorization').split(' ')[1]) |
| app.logger.info(f'Raw authorization: {raw!s}') |
| return raw.decode().split(':')[0] |
| app.logger.info(request.headers) |
| raise Exception('username not supplied') |
| else: |
| return current_user.id |
| |
| def auth_login_required(f): |
| if config.get('DEBUG', False): |
| @wraps(f) |
| def wrapper(*args, **kwargs): |
| try: |
| auth_get_user() |
| except Exception: |
| app.logger.exception("auth get exception") |
| response = make_response('', 401) |
| response.headers['WWW-Authenticate'] = 'Basic realm="at.hackerspace.pl", charset="UTF-8"' |
| return response |
| return f(*args, **kwargs) |
| return wrapper |
| else: |
| return login_required(f) |
| |
| |
| def restrict_ip(prefixes : List[str] = [], exclude : List[str] = []): |
| def decorator(f): |
| @wraps(f) |
| def func(*a, **kw): |
| r_addr = v4addr() |
| if r_addr in exclude: |
| app.logger.info('got IP %s, rejecting', r_addr) |
| return render_template('invalid_ip.html', ip_address=r_addr), 403 |
| |
| for prefix in prefixes: |
| if r_addr.startswith(prefix): |
| break |
| else: |
| app.logger.info('got IP %s, rejecting', r_addr) |
| return render_template('invalid_ip.html', ip_address=r_addr), 403 |
| |
| return f(*a, **kw) |
| return func |
| return decorator |
| |
| |
| @app.template_filter('strfts') |
| def strfts(ts, format='%Y-%m-%d %H:%M'): |
| return datetime.utcfromtimestamp(ts).strftime(format) |
| |
| @app.template_filter('utcisoformat') |
| def utcisoformat(ts): |
| return datetime.utcfromtimestamp(ts).replace( |
| tzinfo=timezone.utc).isoformat() |
| |
| |
| @app.template_filter('wikiurl') |
| def wikiurl(user): |
| return app.config['WIKI_URL'].replace("${login}", user) |
| |
| |
| @app.before_request |
| def make_connection(): |
| conn = sqlite3.connect(app.config['DB']) |
| conn.row_factory = sqlite3.Row |
| conn.isolation_level = None # for autocommit mode |
| g.db = conn |
| |
| |
| @app.teardown_request |
| def close_connection(exception): |
| g.db.close() |
| |
| |
| @app.route('/') |
| def main_view(): |
| return render_template('main.html', **now_at()) |
| |
| @app.route('/metrics') |
| def metrics_view(): |
| """Render count of different entities, per kind, in Prometheus format.""" |
| now = now_at() |
| lines = [ |
| "# HELP entities present at the hackerspace according to checkinator / DHCP", |
| "# TYPE people gauge", |
| ] |
| for kind, devices in now.items(): |
| # The kind is being directly text-pasted into the metric below - |
| # let's make sure a new kind with special characters doesn't mess |
| # things up too much. |
| if '"' in kind: |
| continue |
| # Not using formatting, as the Prometheus format contains '{' and |
| # '}' characters which throw off Python's .format(). |
| line = 'checkinator_now_present_entities{entity_kind="' + kind + '"} ' |
| line += str(len(devices)) |
| lines.append(line) |
| return Response('\n'.join(lines), mimetype='text/plain') |
| |
| @app.route('/api') |
| def list_all(): |
| data = now_at() |
| |
| def prettify_user(xxx_todo_changeme): |
| (user, atime) = xxx_todo_changeme |
| if user == 'greenmaker': |
| user = 'dreammaker' |
| return { |
| 'login': user, |
| 'timestamp': atime, |
| 'pretty_time': strfts(atime), |
| } |
| result = {} |
| result['users'] = list(map(prettify_user, data.pop('users'))) |
| result.update((k, len(v)) for k, v in list(data.items())) |
| res = make_response(json.dumps(result), 200) |
| res.headers['Access-Control-Allow-Origin'] = '*' |
| return res |
| |
| def now_at(): |
| result = dict() |
| devices = app.updater.get_active_devices() |
| macs = list(devices.keys()) |
| |
| identified_devices = list(get_device_infos(g.db, macs)) |
| unknown = set(macs) - set(d.hwaddr for d in identified_devices) |
| |
| # das kektop sorting maschine |
| # identify special devices |
| for name, prefixes in app.config['SPECIAL_DEVICES'].items(): |
| result[name] = set() |
| prefixes = tuple(prefixes) # startswith accepts tuple as argument |
| for hwaddr in list(unknown): |
| if hwaddr.startswith(prefixes): |
| result[name].add(hwaddr) |
| unknown.discard(hwaddr) |
| |
| result['unknown'] = unknown |
| |
| users = {} |
| for info in identified_devices: |
| # append device to user |
| last_seen = users.get(info.owner, 0) |
| if not info.ignored: |
| last_seen = max(last_seen, devices[info.hwaddr].atime) |
| users[info.owner] = last_seen |
| |
| result['users'] = sorted(users.items(), key=lambda u_l: (u_l[1], u_l[0]), reverse=True) |
| |
| return result |
| |
| |
| restrict_to_hs = restrict_ip(prefixes=app.config['CLAIMABLE_PREFIXES'], |
| exclude=app.config['CLAIMABLE_EXCLUDE']) |
| |
| |
| @app.route('/claim', methods=['GET']) |
| @restrict_to_hs |
| @auth_login_required |
| def claim_form(): |
| hwaddr, name = app.updater.get_device(v4addr()) |
| return render_template('claim.html', hwaddr=hwaddr, name=name) |
| |
| @app.route('/my_ip', methods=['GET']) |
| def get_my_ip(): |
| ip = v4addr() |
| hwaddr, name = app.updater.get_device(ip) |
| return f'ip: {ip!r}\nhwaddr: {hwaddr!r}\nhostname: {name!r}\n' |
| |
| @app.route('/claim', methods=['POST']) |
| @restrict_to_hs |
| @auth_login_required |
| def claim(): |
| hwaddr, lease_name = app.updater.get_device(v4addr()) |
| ctx = None |
| if not hwaddr: |
| ctx = dict(error='Invalid device.') |
| else: |
| login = auth_get_user() |
| try: |
| g.db.execute(''' |
| insert into devices (hwaddr, name, owner, ignored) values (?, ?, ?, ?)''', |
| [hwaddr, request.form['name'], login, False]) |
| ctx = {} |
| except sqlite3.Error: |
| error = 'Could not add device! Perhaps someone claimed it?' |
| ctx = dict(error=error) |
| return render_template('post_claim.html', **ctx) |
| |
| def get_user_devices(conn, user): |
| devs = conn.execute('select hwaddr, name, ignored from devices where\ |
| owner = ?', [user]) |
| device_info = [] |
| for row in devs: |
| di = DeviceInfo(row['hwaddr'], row['name'], user, row['ignored']) |
| device_info.append(di) |
| return device_info |
| |
| |
| @app.route('/account', methods=['GET']) |
| @auth_login_required |
| def account(): |
| devices = get_user_devices(g.db, auth_get_user()) |
| return render_template('account.html', devices=devices) |
| |
| |
| def set_ignored(conn, hwaddr, user, value): |
| return conn.execute(''' |
| update devices set ignored = ? where hwaddr = ? and owner = ?''', |
| [value, hwaddr, user]) |
| |
| |
| def delete_device(conn, hwaddr, user): |
| return conn.execute(''' |
| delete from devices where hwaddr = ? and owner = ?''', |
| [hwaddr, user]) |
| |
| @app.route('/devices/<id>/<action>/') |
| @auth_login_required |
| def device(id, action): |
| user = auth_get_user() |
| if action == 'hide': |
| set_ignored(g.db, id, user, True) |
| if action == 'show': |
| set_ignored(g.db, id, user, False) |
| if action == 'delete': |
| delete_device(g.db, id, user) |
| return redirect(url_for('account')) |
| |
| |
| @app.route('/admin') |
| @cap_required('staff') |
| def admin(): |
| data = now_at() |
| return render_template('admin.html', data=data) |
| |
| @app.route('/static/css/basic.css') |
| def css(): |
| return send_file(str(Path('./static/css/basic.css').absolute())) |
| |
| return app |