check in checkinator into hswaw/checkinator

repository: https://code.hackerspace.pl/checkinator
revision: 713c7e6c1a8fd6147522c1a5e3067898a1d8bf7a

Change-Id: I1bd2975a46ec0d9a89d6594fb4b9d49832001627
Reviewed-on: https://gerrit.hackerspace.pl/c/hscloud/+/1219
Reviewed-by: q3k <q3k@hackerspace.pl>
diff --git a/hswaw/checkinator/at/webapp.py b/hswaw/checkinator/at/webapp.py
new file mode 100644
index 0000000..0c8bc0c
--- /dev/null
+++ b/hswaw/checkinator/at/webapp.py
@@ -0,0 +1,104 @@
+"""Entry point for running flask application"""
+
+import at.web 
+from at.dhcp import DhcpdUpdater
+from pathlib import Path
+import yaml
+import os
+import ipaddress
+from typing import Tuple, Optional, Dict
+
+import grpc
+from at.tracker_pb2 import ClientsRequest, HwAddrRequest
+from at.tracker_pb2_grpc import DhcpTrackerStub
+from at.dhcp import DhcpLease
+from datetime import datetime
+
+
+def format_mac(raw: bytes) -> str:
+    return ':'.join(f'{b:02x}' for b in raw)
+
+def mac_from_ipv6(address : ipaddress.IPv6Address):
+    if not isinstance(address, ipaddress.IPv6Address):
+        raise ValueError(f"not an IPv6 address: {address}")
+    raw = address.packed[8:]
+    if raw[3:5] != bytes([0xff, 0xfe]):
+        raise ValueError(f"not MAC based IPv6 Address: {address}")
+    mac = bytes([raw[0] ^ 0x02, *raw[1:3], *raw[5:]])
+    return mac
+
+class DevicesApi:
+    def __init__(self, grpc_channel):
+        self._api = DhcpTrackerStub(grpc_channel)
+
+
+    def get_active_devices(self) -> Dict[str, DhcpLease]:
+        devices = self._api.GetClients(ClientsRequest())
+        return { 
+            format_mac(d.hw_address): DhcpLease(
+                hwaddr=format_mac(d.hw_address),
+                atime=datetime.fromisoformat(d.last_seen).timestamp(),
+                ip=d.ip_address,
+                name=d.client_hostname
+            ) for d in devices.clients
+        }
+
+    def get_device(self, ip: str) -> Tuple[Optional[str], Optional[str]]:
+        hw_address = self._api.GetHwAddr(HwAddrRequest(ip_address=ip)).hw_address
+        if hw_address is not None:
+            devices = self._api.GetClients(ClientsRequest())
+            for device in devices.clients:
+                if device.hw_address == hw_address:
+                    return format_mac(hw_address), device.client_hostname
+            return format_mac(hw_address), ""
+
+        address = ipaddress.ip_address(ip)
+        if isinstance(address, ipaddress.IPv6Address):
+            try:
+                mac = mac_from_ipv6(address)
+            except ValueError:
+                pass
+            else:
+                return ( format_mac(mac), "" )
+
+        return None, None
+
+
+config_path = Path(os.environ.get("CHECKINATOR_WEB_CONFIG", 'web-config.yaml'))
+config = yaml.safe_load(config_path.read_text())
+config.update(yaml.safe_load(Path(config["SECRETS_FILE"]).read_text()))
+
+
+tls_address = config.get("GRPC_TLS_ADDRESS", False)
+unix_socket = config.get('GRPC_UNIX_SOCKET', False)
+if tls_address:
+    print("using secure channel")
+    ca_cert = Path(config.get('GRPC_TLS_CA_CERT')).read_bytes()
+    cert_dir = Path(config.get('GRPC_TLS_CERT_DIR'))
+
+    channel_credential = grpc.ssl_channel_credentials(
+        root_certificates = ca_cert,
+        private_key = cert_dir.joinpath('key.pem').read_bytes(),
+        certificate_chain = cert_dir.joinpath('cert.pem').read_bytes(),
+    )
+
+    options = [
+        ('grpc.ssl_target_name_override', 'at.customs.hackerspace.pl')
+    ]
+    channel = grpc.secure_channel(config.get('GRPC_TLS_ADDRESS'), channel_credential, options=options)
+elif unix_socket:
+    channel = grpc.insecure_channel(f'unix://{unix_socket}')
+else:
+    raise Exception("no GRPC_TLS_ADDRESS or GRPC_UNIX_SOCKET set in config file")
+
+app = at.web.app(Path(__file__).parent, DevicesApi(channel), config)
+
+def run_debug():
+    import argparse
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--port", type=int, default=8080, help="http port")
+    parser.add_argument("--ip", type=str, default='127.0.0.1', help="http port")
+    
+    args = parser.parse_args()
+    
+    app.run(args.ip, args.port, debug=True)