blob: f85aa25b990613365db3296565b34ecb72ab8edf [file] [log] [blame]
vuko3cd087d2021-12-28 13:19:40 +01001import json
2import sqlite3
3from pathlib import Path
vuko43069942021-12-28 21:39:28 +01004from datetime import datetime, timezone
vuko3cd087d2021-12-28 13:19:40 +01005from typing import NamedTuple, Iterable, Iterator, List
6from functools import wraps
7from flask import Flask, render_template, abort, g, \
8 redirect, request, url_for, make_response, send_file, \
9 Response
10from base64 import b64decode
11
12
13from spaceauth import SpaceAuth, login_required, current_user, cap_required
14
15# device infomation stored in database
16class DeviceInfo(NamedTuple):
17 hwaddr: str
18 name: str
19 owner: str
20 ignored: bool
21
22def v4addr():
23 if request.headers.getlist("X-Forwarded-For"):
24 r_addr = request.headers.getlist("X-Forwarded-For")[-1]
25 else:
26 r_addr = request.remote_addr
27 if r_addr.startswith('::ffff:'):
28 r_addr = r_addr[7:]
29 return r_addr
30
31
32def req_to_ctx():
33 return dict(iter(request.form.items()))
34
35def get_device_info(conn: sqlite3.Connection, hwaddr: str) -> DeviceInfo:
36 return list(get_device_infos(conn, (hwaddr,)))[0]
37
38
39def get_device_infos(conn: sqlite3.Connection, hwaddrs: Iterable[str]) -> Iterator[DeviceInfo]:
40 hwaddrs = list(hwaddrs)
41 in_clause = '({})'.format(', '.join(['?'] * len(hwaddrs)))
42 stmt = '''select hwaddr, name, ignored, owner from
43 devices where devices.hwaddr in ''' + in_clause
44 for row in conn.execute(stmt, hwaddrs):
45 owner = row['owner'] or ''
46 ignored = row['ignored']
47 yield DeviceInfo(row['hwaddr'], row['name'], owner, ignored)
48
49def app(instance_path, devices_api, config):
50 app = Flask('at', instance_path=instance_path, instance_relative_config=True)
51 app.config.update(config)
52 app.jinja_env.add_extension('jinja2.ext.i18n')
53 app.jinja_env.install_null_translations()
54 app.updater = devices_api
55
56 if app.config.get('PROXY_FIX'):
vukobd124bd2021-12-28 15:05:59 +010057 from werkzeug.middleware.proxy_fix import ProxyFix
vuko3cd087d2021-12-28 13:19:40 +010058 app.wsgi_app = ProxyFix(app.wsgi_app)
59
60 app.space_auth = SpaceAuth(app)
61
62
63 def auth_get_user():
64 if config.get('DEBUG', False):
65 if "User" in request.headers:
66 return request.headers.get("User")
67 if "Authorization" in request.headers:
68 raw = b64decode(request.headers.get('Authorization').split(' ')[1])
69 app.logger.info(f'Raw authorization: {raw!s}')
70 return raw.decode().split(':')[0]
71 app.logger.info(request.headers)
72 raise Exception('username not supplied')
73 else:
74 return current_user.id
75
76 def auth_login_required(f):
77 if config.get('DEBUG', False):
78 @wraps(f)
79 def wrapper(*args, **kwargs):
80 try:
81 auth_get_user()
82 except Exception:
83 app.logger.exception("auth get exception")
84 response = make_response('', 401)
85 response.headers['WWW-Authenticate'] = 'Basic realm="at.hackerspace.pl", charset="UTF-8"'
86 return response
87 return f(*args, **kwargs)
88 return wrapper
89 else:
90 return login_required(f)
91
92
93 def restrict_ip(prefixes : List[str] = [], exclude : List[str] = []):
94 def decorator(f):
95 @wraps(f)
96 def func(*a, **kw):
97 r_addr = v4addr()
98 if r_addr in exclude:
99 app.logger.info('got IP %s, rejecting', r_addr)
100 return render_template('invalid_ip.html', ip_address=r_addr), 403
101
102 for prefix in prefixes:
103 if r_addr.startswith(prefix):
104 break
105 else:
106 app.logger.info('got IP %s, rejecting', r_addr)
107 return render_template('invalid_ip.html', ip_address=r_addr), 403
108
109 return f(*a, **kw)
110 return func
111 return decorator
112
113
114 @app.template_filter('strfts')
vuko43069942021-12-28 21:39:28 +0100115 def strfts(ts, format='%Y-%m-%d %H:%M'):
116 return datetime.utcfromtimestamp(ts).strftime(format)
117
118 @app.template_filter('utcisoformat')
119 def utcisoformat(ts):
120 return datetime.utcfromtimestamp(ts).replace(
121 tzinfo=timezone.utc).isoformat()
vuko3cd087d2021-12-28 13:19:40 +0100122
123
124 @app.template_filter('wikiurl')
125 def wikiurl(user):
126 return app.config['WIKI_URL'].replace("${login}", user)
127
128
129 @app.before_request
130 def make_connection():
131 conn = sqlite3.connect(app.config['DB'])
132 conn.row_factory = sqlite3.Row
133 conn.isolation_level = None # for autocommit mode
134 g.db = conn
135
136
137 @app.teardown_request
138 def close_connection(exception):
139 g.db.close()
140
141
142 @app.route('/')
143 def main_view():
144 return render_template('main.html', **now_at())
145
146 @app.route('/metrics')
147 def metrics_view():
148 """Render count of different entities, per kind, in Prometheus format."""
149 now = now_at()
150 lines = [
151 "# HELP entities present at the hackerspace according to checkinator / DHCP",
152 "# TYPE people gauge",
153 ]
154 for kind, devices in now.items():
155 # The kind is being directly text-pasted into the metric below -
156 # let's make sure a new kind with special characters doesn't mess
157 # things up too much.
158 if '"' in kind:
159 continue
160 # Not using formatting, as the Prometheus format contains '{' and
161 # '}' characters which throw off Python's .format().
162 line = 'checkinator_now_present_entities{entity_kind="' + kind + '"} '
163 line += str(len(devices))
164 lines.append(line)
165 return Response('\n'.join(lines), mimetype='text/plain')
166
167 @app.route('/api')
168 def list_all():
169 data = now_at()
170
171 def prettify_user(xxx_todo_changeme):
172 (user, atime) = xxx_todo_changeme
173 if user == 'greenmaker':
174 user = 'dreammaker'
175 return {
176 'login': user,
177 'timestamp': atime,
178 'pretty_time': strfts(atime),
179 }
180 result = {}
181 result['users'] = list(map(prettify_user, data.pop('users')))
182 result.update((k, len(v)) for k, v in list(data.items()))
183 res = make_response(json.dumps(result), 200)
184 res.headers['Access-Control-Allow-Origin'] = '*'
185 return res
186
187 def now_at():
188 result = dict()
189 devices = app.updater.get_active_devices()
190 macs = list(devices.keys())
191
192 identified_devices = list(get_device_infos(g.db, macs))
193 unknown = set(macs) - set(d.hwaddr for d in identified_devices)
194
195 # das kektop sorting maschine
196 # identify special devices
197 for name, prefixes in app.config['SPECIAL_DEVICES'].items():
198 result[name] = set()
199 prefixes = tuple(prefixes) # startswith accepts tuple as argument
200 for hwaddr in list(unknown):
201 if hwaddr.startswith(prefixes):
202 result[name].add(hwaddr)
203 unknown.discard(hwaddr)
204
205 result['unknown'] = unknown
206
207 users = {}
208 for info in identified_devices:
209 # append device to user
210 last_seen = users.get(info.owner, 0)
211 if not info.ignored:
212 last_seen = max(last_seen, devices[info.hwaddr].atime)
213 users[info.owner] = last_seen
214
215 result['users'] = sorted(users.items(), key=lambda u_l: (u_l[1], u_l[0]), reverse=True)
216
217 return result
218
219
220 restrict_to_hs = restrict_ip(prefixes=app.config['CLAIMABLE_PREFIXES'],
221 exclude=app.config['CLAIMABLE_EXCLUDE'])
222
223
224 @app.route('/claim', methods=['GET'])
225 @restrict_to_hs
226 @auth_login_required
227 def claim_form():
228 hwaddr, name = app.updater.get_device(v4addr())
229 return render_template('claim.html', hwaddr=hwaddr, name=name)
230
231 @app.route('/my_ip', methods=['GET'])
232 def get_my_ip():
233 ip = v4addr()
234 hwaddr, name = app.updater.get_device(ip)
235 return f'ip: {ip!r}\nhwaddr: {hwaddr!r}\nhostname: {name!r}\n'
236
237 @app.route('/claim', methods=['POST'])
238 @restrict_to_hs
239 @auth_login_required
240 def claim():
241 hwaddr, lease_name = app.updater.get_device(v4addr())
242 ctx = None
243 if not hwaddr:
244 ctx = dict(error='Invalid device.')
245 else:
246 login = auth_get_user()
247 try:
248 g.db.execute('''
249 insert into devices (hwaddr, name, owner, ignored) values (?, ?, ?, ?)''',
250 [hwaddr, request.form['name'], login, False])
251 ctx = {}
252 except sqlite3.Error:
253 error = 'Could not add device! Perhaps someone claimed it?'
254 ctx = dict(error=error)
255 return render_template('post_claim.html', **ctx)
256
257 def get_user_devices(conn, user):
258 devs = conn.execute('select hwaddr, name, ignored from devices where\
259 owner = ?', [user])
260 device_info = []
261 for row in devs:
262 di = DeviceInfo(row['hwaddr'], row['name'], user, row['ignored'])
263 device_info.append(di)
264 return device_info
265
266
267 @app.route('/account', methods=['GET'])
268 @auth_login_required
269 def account():
270 devices = get_user_devices(g.db, auth_get_user())
271 return render_template('account.html', devices=devices)
272
273
274 def set_ignored(conn, hwaddr, user, value):
275 return conn.execute('''
276 update devices set ignored = ? where hwaddr = ? and owner = ?''',
277 [value, hwaddr, user])
278
279
280 def delete_device(conn, hwaddr, user):
281 return conn.execute('''
282 delete from devices where hwaddr = ? and owner = ?''',
283 [hwaddr, user])
284
285 @app.route('/devices/<id>/<action>/')
286 @auth_login_required
287 def device(id, action):
288 user = auth_get_user()
289 if action == 'hide':
290 set_ignored(g.db, id, user, True)
291 if action == 'show':
292 set_ignored(g.db, id, user, False)
293 if action == 'delete':
294 delete_device(g.db, id, user)
295 return redirect(url_for('account'))
296
297
298 @app.route('/admin')
299 @cap_required('staff')
300 def admin():
301 data = now_at()
302 return render_template('admin.html', data=data)
303
304 @app.route('/static/css/basic.css')
305 def css():
306 return send_file(str(Path('./static/css/basic.css').absolute()))
307
308 return app