blob: 34b63371e2a4b7aad7439096b802005f3bb5811a [file] [log] [blame]
vuko3cd087d2021-12-28 13:19:40 +01001import json
2import sqlite3
3from pathlib import Path
4from datetime import datetime
5from typing import NamedTuple, Iterable, Iterator, List
6from functools import wraps
7from flask import Flask, render_template, abort, g, \
8 redirect, request, url_for, make_response, send_file, \
9 Response
10from base64 import b64decode
11
12
13from spaceauth import SpaceAuth, login_required, current_user, cap_required
14
15# device infomation stored in database
16class DeviceInfo(NamedTuple):
17 hwaddr: str
18 name: str
19 owner: str
20 ignored: bool
21
22def v4addr():
23 if request.headers.getlist("X-Forwarded-For"):
24 r_addr = request.headers.getlist("X-Forwarded-For")[-1]
25 else:
26 r_addr = request.remote_addr
27 if r_addr.startswith('::ffff:'):
28 r_addr = r_addr[7:]
29 return r_addr
30
31
32def req_to_ctx():
33 return dict(iter(request.form.items()))
34
35def get_device_info(conn: sqlite3.Connection, hwaddr: str) -> DeviceInfo:
36 return list(get_device_infos(conn, (hwaddr,)))[0]
37
38
39def get_device_infos(conn: sqlite3.Connection, hwaddrs: Iterable[str]) -> Iterator[DeviceInfo]:
40 hwaddrs = list(hwaddrs)
41 in_clause = '({})'.format(', '.join(['?'] * len(hwaddrs)))
42 stmt = '''select hwaddr, name, ignored, owner from
43 devices where devices.hwaddr in ''' + in_clause
44 for row in conn.execute(stmt, hwaddrs):
45 owner = row['owner'] or ''
46 ignored = row['ignored']
47 yield DeviceInfo(row['hwaddr'], row['name'], owner, ignored)
48
49def app(instance_path, devices_api, config):
50 app = Flask('at', instance_path=instance_path, instance_relative_config=True)
51 app.config.update(config)
52 app.jinja_env.add_extension('jinja2.ext.i18n')
53 app.jinja_env.install_null_translations()
54 app.updater = devices_api
55
56 if app.config.get('PROXY_FIX'):
57 from werkzeug.contrib.fixers import ProxyFix
58 app.wsgi_app = ProxyFix(app.wsgi_app)
59
60 app.space_auth = SpaceAuth(app)
61
62
63 def auth_get_user():
64 if config.get('DEBUG', False):
65 if "User" in request.headers:
66 return request.headers.get("User")
67 if "Authorization" in request.headers:
68 raw = b64decode(request.headers.get('Authorization').split(' ')[1])
69 app.logger.info(f'Raw authorization: {raw!s}')
70 return raw.decode().split(':')[0]
71 app.logger.info(request.headers)
72 raise Exception('username not supplied')
73 else:
74 return current_user.id
75
76 def auth_login_required(f):
77 if config.get('DEBUG', False):
78 @wraps(f)
79 def wrapper(*args, **kwargs):
80 try:
81 auth_get_user()
82 except Exception:
83 app.logger.exception("auth get exception")
84 response = make_response('', 401)
85 response.headers['WWW-Authenticate'] = 'Basic realm="at.hackerspace.pl", charset="UTF-8"'
86 return response
87 return f(*args, **kwargs)
88 return wrapper
89 else:
90 return login_required(f)
91
92
93 def restrict_ip(prefixes : List[str] = [], exclude : List[str] = []):
94 def decorator(f):
95 @wraps(f)
96 def func(*a, **kw):
97 r_addr = v4addr()
98 if r_addr in exclude:
99 app.logger.info('got IP %s, rejecting', r_addr)
100 return render_template('invalid_ip.html', ip_address=r_addr), 403
101
102 for prefix in prefixes:
103 if r_addr.startswith(prefix):
104 break
105 else:
106 app.logger.info('got IP %s, rejecting', r_addr)
107 return render_template('invalid_ip.html', ip_address=r_addr), 403
108
109 return f(*a, **kw)
110 return func
111 return decorator
112
113
114 @app.template_filter('strfts')
115 def strfts(ts, format='%d/%m/%Y %H:%M'):
116 return datetime.fromtimestamp(ts).strftime(format)
117
118
119 @app.template_filter('wikiurl')
120 def wikiurl(user):
121 return app.config['WIKI_URL'].replace("${login}", user)
122
123
124 @app.before_request
125 def make_connection():
126 conn = sqlite3.connect(app.config['DB'])
127 conn.row_factory = sqlite3.Row
128 conn.isolation_level = None # for autocommit mode
129 g.db = conn
130
131
132 @app.teardown_request
133 def close_connection(exception):
134 g.db.close()
135
136
137 @app.route('/')
138 def main_view():
139 return render_template('main.html', **now_at())
140
141 @app.route('/metrics')
142 def metrics_view():
143 """Render count of different entities, per kind, in Prometheus format."""
144 now = now_at()
145 lines = [
146 "# HELP entities present at the hackerspace according to checkinator / DHCP",
147 "# TYPE people gauge",
148 ]
149 for kind, devices in now.items():
150 # The kind is being directly text-pasted into the metric below -
151 # let's make sure a new kind with special characters doesn't mess
152 # things up too much.
153 if '"' in kind:
154 continue
155 # Not using formatting, as the Prometheus format contains '{' and
156 # '}' characters which throw off Python's .format().
157 line = 'checkinator_now_present_entities{entity_kind="' + kind + '"} '
158 line += str(len(devices))
159 lines.append(line)
160 return Response('\n'.join(lines), mimetype='text/plain')
161
162 @app.route('/api')
163 def list_all():
164 data = now_at()
165
166 def prettify_user(xxx_todo_changeme):
167 (user, atime) = xxx_todo_changeme
168 if user == 'greenmaker':
169 user = 'dreammaker'
170 return {
171 'login': user,
172 'timestamp': atime,
173 'pretty_time': strfts(atime),
174 }
175 result = {}
176 result['users'] = list(map(prettify_user, data.pop('users')))
177 result.update((k, len(v)) for k, v in list(data.items()))
178 res = make_response(json.dumps(result), 200)
179 res.headers['Access-Control-Allow-Origin'] = '*'
180 return res
181
182 def now_at():
183 result = dict()
184 devices = app.updater.get_active_devices()
185 macs = list(devices.keys())
186
187 identified_devices = list(get_device_infos(g.db, macs))
188 unknown = set(macs) - set(d.hwaddr for d in identified_devices)
189
190 # das kektop sorting maschine
191 # identify special devices
192 for name, prefixes in app.config['SPECIAL_DEVICES'].items():
193 result[name] = set()
194 prefixes = tuple(prefixes) # startswith accepts tuple as argument
195 for hwaddr in list(unknown):
196 if hwaddr.startswith(prefixes):
197 result[name].add(hwaddr)
198 unknown.discard(hwaddr)
199
200 result['unknown'] = unknown
201
202 users = {}
203 for info in identified_devices:
204 # append device to user
205 last_seen = users.get(info.owner, 0)
206 if not info.ignored:
207 last_seen = max(last_seen, devices[info.hwaddr].atime)
208 users[info.owner] = last_seen
209
210 result['users'] = sorted(users.items(), key=lambda u_l: (u_l[1], u_l[0]), reverse=True)
211
212 return result
213
214
215 restrict_to_hs = restrict_ip(prefixes=app.config['CLAIMABLE_PREFIXES'],
216 exclude=app.config['CLAIMABLE_EXCLUDE'])
217
218
219 @app.route('/claim', methods=['GET'])
220 @restrict_to_hs
221 @auth_login_required
222 def claim_form():
223 hwaddr, name = app.updater.get_device(v4addr())
224 return render_template('claim.html', hwaddr=hwaddr, name=name)
225
226 @app.route('/my_ip', methods=['GET'])
227 def get_my_ip():
228 ip = v4addr()
229 hwaddr, name = app.updater.get_device(ip)
230 return f'ip: {ip!r}\nhwaddr: {hwaddr!r}\nhostname: {name!r}\n'
231
232 @app.route('/claim', methods=['POST'])
233 @restrict_to_hs
234 @auth_login_required
235 def claim():
236 hwaddr, lease_name = app.updater.get_device(v4addr())
237 ctx = None
238 if not hwaddr:
239 ctx = dict(error='Invalid device.')
240 else:
241 login = auth_get_user()
242 try:
243 g.db.execute('''
244 insert into devices (hwaddr, name, owner, ignored) values (?, ?, ?, ?)''',
245 [hwaddr, request.form['name'], login, False])
246 ctx = {}
247 except sqlite3.Error:
248 error = 'Could not add device! Perhaps someone claimed it?'
249 ctx = dict(error=error)
250 return render_template('post_claim.html', **ctx)
251
252 def get_user_devices(conn, user):
253 devs = conn.execute('select hwaddr, name, ignored from devices where\
254 owner = ?', [user])
255 device_info = []
256 for row in devs:
257 di = DeviceInfo(row['hwaddr'], row['name'], user, row['ignored'])
258 device_info.append(di)
259 return device_info
260
261
262 @app.route('/account', methods=['GET'])
263 @auth_login_required
264 def account():
265 devices = get_user_devices(g.db, auth_get_user())
266 return render_template('account.html', devices=devices)
267
268
269 def set_ignored(conn, hwaddr, user, value):
270 return conn.execute('''
271 update devices set ignored = ? where hwaddr = ? and owner = ?''',
272 [value, hwaddr, user])
273
274
275 def delete_device(conn, hwaddr, user):
276 return conn.execute('''
277 delete from devices where hwaddr = ? and owner = ?''',
278 [hwaddr, user])
279
280 @app.route('/devices/<id>/<action>/')
281 @auth_login_required
282 def device(id, action):
283 user = auth_get_user()
284 if action == 'hide':
285 set_ignored(g.db, id, user, True)
286 if action == 'show':
287 set_ignored(g.db, id, user, False)
288 if action == 'delete':
289 delete_device(g.db, id, user)
290 return redirect(url_for('account'))
291
292
293 @app.route('/admin')
294 @cap_required('staff')
295 def admin():
296 data = now_at()
297 return render_template('admin.html', data=data)
298
299 @app.route('/static/css/basic.css')
300 def css():
301 return send_file(str(Path('./static/css/basic.css').absolute()))
302
303 return app