| """Entry point for running flask application""" |
| |
| import at.web |
| from at.dhcp import DhcpdUpdater |
| from pathlib import Path |
| import yaml |
| import os |
| import ipaddress |
| from typing import Tuple, Optional, Dict |
| |
| import grpc |
| from at.tracker_pb2 import ClientsRequest, HwAddrRequest |
| from at.tracker_pb2_grpc import DhcpTrackerStub |
| from at.dhcp import DhcpLease |
| from datetime import datetime |
| |
| |
| def format_mac(raw: bytes) -> str: |
| return ':'.join(f'{b:02x}' for b in raw) |
| |
| def mac_from_ipv6(address : ipaddress.IPv6Address): |
| if not isinstance(address, ipaddress.IPv6Address): |
| raise ValueError(f"not an IPv6 address: {address}") |
| raw = address.packed[8:] |
| if raw[3:5] != bytes([0xff, 0xfe]): |
| raise ValueError(f"not MAC based IPv6 Address: {address}") |
| mac = bytes([raw[0] ^ 0x02, *raw[1:3], *raw[5:]]) |
| return mac |
| |
| class DevicesApi: |
| def __init__(self, grpc_channel): |
| self._api = DhcpTrackerStub(grpc_channel) |
| |
| |
| def get_active_devices(self) -> Dict[str, DhcpLease]: |
| devices = self._api.GetClients(ClientsRequest()) |
| return { |
| format_mac(d.hw_address): DhcpLease( |
| hwaddr=format_mac(d.hw_address), |
| atime=datetime.fromisoformat(d.last_seen).timestamp(), |
| ip=d.ip_address, |
| name=d.client_hostname |
| ) for d in devices.clients |
| } |
| |
| def get_device(self, ip: str) -> Tuple[Optional[str], Optional[str]]: |
| hw_address = self._api.GetHwAddr(HwAddrRequest(ip_address=ip)).hw_address |
| if hw_address is not None: |
| devices = self._api.GetClients(ClientsRequest()) |
| for device in devices.clients: |
| if device.hw_address == hw_address: |
| return format_mac(hw_address), device.client_hostname |
| return format_mac(hw_address), "" |
| |
| address = ipaddress.ip_address(ip) |
| if isinstance(address, ipaddress.IPv6Address): |
| try: |
| mac = mac_from_ipv6(address) |
| except ValueError: |
| pass |
| else: |
| return ( format_mac(mac), "" ) |
| |
| return None, None |
| |
| |
| config_path = Path(os.environ.get("CHECKINATOR_WEB_CONFIG", 'web-config.yaml')) |
| config = yaml.safe_load(config_path.read_text()) |
| config.update(yaml.safe_load(Path(config["SECRETS_FILE"]).read_text())) |
| |
| |
| tls_address = config.get("GRPC_TLS_ADDRESS", False) |
| unix_socket = config.get('GRPC_UNIX_SOCKET', False) |
| if tls_address: |
| print("using secure channel") |
| ca_cert = Path(config.get('GRPC_TLS_CA_CERT')).read_bytes() |
| cert_dir = Path(config.get('GRPC_TLS_CERT_DIR')) |
| |
| channel_credential = grpc.ssl_channel_credentials( |
| root_certificates = ca_cert, |
| private_key = cert_dir.joinpath('key.pem').read_bytes(), |
| certificate_chain = cert_dir.joinpath('cert.pem').read_bytes(), |
| ) |
| |
| options = [ |
| ('grpc.ssl_target_name_override', 'at.customs.hackerspace.pl') |
| ] |
| channel = grpc.secure_channel(config.get('GRPC_TLS_ADDRESS'), channel_credential, options=options) |
| elif unix_socket: |
| channel = grpc.insecure_channel(f'unix://{unix_socket}') |
| else: |
| raise Exception("no GRPC_TLS_ADDRESS or GRPC_UNIX_SOCKET set in config file") |
| |
| app = at.web.app(Path(__file__).parent, DevicesApi(channel), config) |
| |
| def run_debug(): |
| import argparse |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--port", type=int, default=8080, help="http port") |
| parser.add_argument("--ip", type=str, default='127.0.0.1', help="http port") |
| |
| args = parser.parse_args() |
| |
| app.run(args.ip, args.port, debug=True) |