check in checkinator into hswaw/checkinator
repository: https://code.hackerspace.pl/checkinator
revision: 713c7e6c1a8fd6147522c1a5e3067898a1d8bf7a
Change-Id: I1bd2975a46ec0d9a89d6594fb4b9d49832001627
Reviewed-on: https://gerrit.hackerspace.pl/c/hscloud/+/1219
Reviewed-by: q3k <q3k@hackerspace.pl>
diff --git a/hswaw/checkinator/at/webapp.py b/hswaw/checkinator/at/webapp.py
new file mode 100644
index 0000000..0c8bc0c
--- /dev/null
+++ b/hswaw/checkinator/at/webapp.py
@@ -0,0 +1,104 @@
+"""Entry point for running flask application"""
+
+import at.web
+from at.dhcp import DhcpdUpdater
+from pathlib import Path
+import yaml
+import os
+import ipaddress
+from typing import Tuple, Optional, Dict
+
+import grpc
+from at.tracker_pb2 import ClientsRequest, HwAddrRequest
+from at.tracker_pb2_grpc import DhcpTrackerStub
+from at.dhcp import DhcpLease
+from datetime import datetime
+
+
+def format_mac(raw: bytes) -> str:
+ return ':'.join(f'{b:02x}' for b in raw)
+
+def mac_from_ipv6(address : ipaddress.IPv6Address):
+ if not isinstance(address, ipaddress.IPv6Address):
+ raise ValueError(f"not an IPv6 address: {address}")
+ raw = address.packed[8:]
+ if raw[3:5] != bytes([0xff, 0xfe]):
+ raise ValueError(f"not MAC based IPv6 Address: {address}")
+ mac = bytes([raw[0] ^ 0x02, *raw[1:3], *raw[5:]])
+ return mac
+
+class DevicesApi:
+ def __init__(self, grpc_channel):
+ self._api = DhcpTrackerStub(grpc_channel)
+
+
+ def get_active_devices(self) -> Dict[str, DhcpLease]:
+ devices = self._api.GetClients(ClientsRequest())
+ return {
+ format_mac(d.hw_address): DhcpLease(
+ hwaddr=format_mac(d.hw_address),
+ atime=datetime.fromisoformat(d.last_seen).timestamp(),
+ ip=d.ip_address,
+ name=d.client_hostname
+ ) for d in devices.clients
+ }
+
+ def get_device(self, ip: str) -> Tuple[Optional[str], Optional[str]]:
+ hw_address = self._api.GetHwAddr(HwAddrRequest(ip_address=ip)).hw_address
+ if hw_address is not None:
+ devices = self._api.GetClients(ClientsRequest())
+ for device in devices.clients:
+ if device.hw_address == hw_address:
+ return format_mac(hw_address), device.client_hostname
+ return format_mac(hw_address), ""
+
+ address = ipaddress.ip_address(ip)
+ if isinstance(address, ipaddress.IPv6Address):
+ try:
+ mac = mac_from_ipv6(address)
+ except ValueError:
+ pass
+ else:
+ return ( format_mac(mac), "" )
+
+ return None, None
+
+
+config_path = Path(os.environ.get("CHECKINATOR_WEB_CONFIG", 'web-config.yaml'))
+config = yaml.safe_load(config_path.read_text())
+config.update(yaml.safe_load(Path(config["SECRETS_FILE"]).read_text()))
+
+
+tls_address = config.get("GRPC_TLS_ADDRESS", False)
+unix_socket = config.get('GRPC_UNIX_SOCKET', False)
+if tls_address:
+ print("using secure channel")
+ ca_cert = Path(config.get('GRPC_TLS_CA_CERT')).read_bytes()
+ cert_dir = Path(config.get('GRPC_TLS_CERT_DIR'))
+
+ channel_credential = grpc.ssl_channel_credentials(
+ root_certificates = ca_cert,
+ private_key = cert_dir.joinpath('key.pem').read_bytes(),
+ certificate_chain = cert_dir.joinpath('cert.pem').read_bytes(),
+ )
+
+ options = [
+ ('grpc.ssl_target_name_override', 'at.customs.hackerspace.pl')
+ ]
+ channel = grpc.secure_channel(config.get('GRPC_TLS_ADDRESS'), channel_credential, options=options)
+elif unix_socket:
+ channel = grpc.insecure_channel(f'unix://{unix_socket}')
+else:
+ raise Exception("no GRPC_TLS_ADDRESS or GRPC_UNIX_SOCKET set in config file")
+
+app = at.web.app(Path(__file__).parent, DevicesApi(channel), config)
+
+def run_debug():
+ import argparse
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--port", type=int, default=8080, help="http port")
+ parser.add_argument("--ip", type=str, default='127.0.0.1', help="http port")
+
+ args = parser.parse_args()
+
+ app.run(args.ip, args.port, debug=True)