blob: 0c8bc0c4e4ac74e1163f6420830e53a36ac0d75c [file] [log] [blame]
vuko3cd087d2021-12-28 13:19:40 +01001"""Entry point for running flask application"""
2
3import at.web
4from at.dhcp import DhcpdUpdater
5from pathlib import Path
6import yaml
7import os
8import ipaddress
9from typing import Tuple, Optional, Dict
10
11import grpc
12from at.tracker_pb2 import ClientsRequest, HwAddrRequest
13from at.tracker_pb2_grpc import DhcpTrackerStub
14from at.dhcp import DhcpLease
15from datetime import datetime
16
17
18def format_mac(raw: bytes) -> str:
19 return ':'.join(f'{b:02x}' for b in raw)
20
21def mac_from_ipv6(address : ipaddress.IPv6Address):
22 if not isinstance(address, ipaddress.IPv6Address):
23 raise ValueError(f"not an IPv6 address: {address}")
24 raw = address.packed[8:]
25 if raw[3:5] != bytes([0xff, 0xfe]):
26 raise ValueError(f"not MAC based IPv6 Address: {address}")
27 mac = bytes([raw[0] ^ 0x02, *raw[1:3], *raw[5:]])
28 return mac
29
30class DevicesApi:
31 def __init__(self, grpc_channel):
32 self._api = DhcpTrackerStub(grpc_channel)
33
34
35 def get_active_devices(self) -> Dict[str, DhcpLease]:
36 devices = self._api.GetClients(ClientsRequest())
37 return {
38 format_mac(d.hw_address): DhcpLease(
39 hwaddr=format_mac(d.hw_address),
40 atime=datetime.fromisoformat(d.last_seen).timestamp(),
41 ip=d.ip_address,
42 name=d.client_hostname
43 ) for d in devices.clients
44 }
45
46 def get_device(self, ip: str) -> Tuple[Optional[str], Optional[str]]:
47 hw_address = self._api.GetHwAddr(HwAddrRequest(ip_address=ip)).hw_address
48 if hw_address is not None:
49 devices = self._api.GetClients(ClientsRequest())
50 for device in devices.clients:
51 if device.hw_address == hw_address:
52 return format_mac(hw_address), device.client_hostname
53 return format_mac(hw_address), ""
54
55 address = ipaddress.ip_address(ip)
56 if isinstance(address, ipaddress.IPv6Address):
57 try:
58 mac = mac_from_ipv6(address)
59 except ValueError:
60 pass
61 else:
62 return ( format_mac(mac), "" )
63
64 return None, None
65
66
67config_path = Path(os.environ.get("CHECKINATOR_WEB_CONFIG", 'web-config.yaml'))
68config = yaml.safe_load(config_path.read_text())
69config.update(yaml.safe_load(Path(config["SECRETS_FILE"]).read_text()))
70
71
72tls_address = config.get("GRPC_TLS_ADDRESS", False)
73unix_socket = config.get('GRPC_UNIX_SOCKET', False)
74if tls_address:
75 print("using secure channel")
76 ca_cert = Path(config.get('GRPC_TLS_CA_CERT')).read_bytes()
77 cert_dir = Path(config.get('GRPC_TLS_CERT_DIR'))
78
79 channel_credential = grpc.ssl_channel_credentials(
80 root_certificates = ca_cert,
81 private_key = cert_dir.joinpath('key.pem').read_bytes(),
82 certificate_chain = cert_dir.joinpath('cert.pem').read_bytes(),
83 )
84
85 options = [
86 ('grpc.ssl_target_name_override', 'at.customs.hackerspace.pl')
87 ]
88 channel = grpc.secure_channel(config.get('GRPC_TLS_ADDRESS'), channel_credential, options=options)
89elif unix_socket:
90 channel = grpc.insecure_channel(f'unix://{unix_socket}')
91else:
92 raise Exception("no GRPC_TLS_ADDRESS or GRPC_UNIX_SOCKET set in config file")
93
94app = at.web.app(Path(__file__).parent, DevicesApi(channel), config)
95
96def run_debug():
97 import argparse
98 parser = argparse.ArgumentParser()
99 parser.add_argument("--port", type=int, default=8080, help="http port")
100 parser.add_argument("--ip", type=str, default='127.0.0.1', help="http port")
101
102 args = parser.parse_args()
103
104 app.run(args.ip, args.port, debug=True)