blob: 0c8bc0c4e4ac74e1163f6420830e53a36ac0d75c [file] [log] [blame]
"""Entry point for running flask application"""
import at.web
from at.dhcp import DhcpdUpdater
from pathlib import Path
import yaml
import os
import ipaddress
from typing import Tuple, Optional, Dict
import grpc
from at.tracker_pb2 import ClientsRequest, HwAddrRequest
from at.tracker_pb2_grpc import DhcpTrackerStub
from at.dhcp import DhcpLease
from datetime import datetime
def format_mac(raw: bytes) -> str:
return ':'.join(f'{b:02x}' for b in raw)
def mac_from_ipv6(address : ipaddress.IPv6Address):
if not isinstance(address, ipaddress.IPv6Address):
raise ValueError(f"not an IPv6 address: {address}")
raw = address.packed[8:]
if raw[3:5] != bytes([0xff, 0xfe]):
raise ValueError(f"not MAC based IPv6 Address: {address}")
mac = bytes([raw[0] ^ 0x02, *raw[1:3], *raw[5:]])
return mac
class DevicesApi:
def __init__(self, grpc_channel):
self._api = DhcpTrackerStub(grpc_channel)
def get_active_devices(self) -> Dict[str, DhcpLease]:
devices = self._api.GetClients(ClientsRequest())
return {
format_mac(d.hw_address): DhcpLease(
hwaddr=format_mac(d.hw_address),
atime=datetime.fromisoformat(d.last_seen).timestamp(),
ip=d.ip_address,
name=d.client_hostname
) for d in devices.clients
}
def get_device(self, ip: str) -> Tuple[Optional[str], Optional[str]]:
hw_address = self._api.GetHwAddr(HwAddrRequest(ip_address=ip)).hw_address
if hw_address is not None:
devices = self._api.GetClients(ClientsRequest())
for device in devices.clients:
if device.hw_address == hw_address:
return format_mac(hw_address), device.client_hostname
return format_mac(hw_address), ""
address = ipaddress.ip_address(ip)
if isinstance(address, ipaddress.IPv6Address):
try:
mac = mac_from_ipv6(address)
except ValueError:
pass
else:
return ( format_mac(mac), "" )
return None, None
config_path = Path(os.environ.get("CHECKINATOR_WEB_CONFIG", 'web-config.yaml'))
config = yaml.safe_load(config_path.read_text())
config.update(yaml.safe_load(Path(config["SECRETS_FILE"]).read_text()))
tls_address = config.get("GRPC_TLS_ADDRESS", False)
unix_socket = config.get('GRPC_UNIX_SOCKET', False)
if tls_address:
print("using secure channel")
ca_cert = Path(config.get('GRPC_TLS_CA_CERT')).read_bytes()
cert_dir = Path(config.get('GRPC_TLS_CERT_DIR'))
channel_credential = grpc.ssl_channel_credentials(
root_certificates = ca_cert,
private_key = cert_dir.joinpath('key.pem').read_bytes(),
certificate_chain = cert_dir.joinpath('cert.pem').read_bytes(),
)
options = [
('grpc.ssl_target_name_override', 'at.customs.hackerspace.pl')
]
channel = grpc.secure_channel(config.get('GRPC_TLS_ADDRESS'), channel_credential, options=options)
elif unix_socket:
channel = grpc.insecure_channel(f'unix://{unix_socket}')
else:
raise Exception("no GRPC_TLS_ADDRESS or GRPC_UNIX_SOCKET set in config file")
app = at.web.app(Path(__file__).parent, DevicesApi(channel), config)
def run_debug():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8080, help="http port")
parser.add_argument("--ip", type=str, default='127.0.0.1', help="http port")
args = parser.parse_args()
app.run(args.ip, args.port, debug=True)