blob: 4ec9e706131d339c49a83b386fb01ef307f843ff [file] [log] [blame]
from at.dhcp import KeaUpdater, DhcpdUpdater, DhcpLease
from pathlib import Path
import yaml
import grpc
import json
import re
import subprocess
import logging
from concurrent import futures
from datetime import datetime, timezone
from .tracker_pb2 import DhcpClient, DhcpClients, HwAddrResponse
from .tracker_pb2_grpc import DhcpTrackerServicer, add_DhcpTrackerServicer_to_server
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--verbose", help="output more info", action="store_true")
parser.add_argument("config", type=Path, help="input file")
logging.basicConfig(level=logging.INFO)
def lease_to_client(lease: DhcpLease) -> DhcpClient:
return DhcpClient(
hw_address = bytes.fromhex(lease.hwaddr.replace(':', '')),
last_seen = datetime.utcfromtimestamp(lease.atime).replace(
tzinfo=timezone.utc).isoformat(),
client_hostname = lease.name,
ip_address = lease.ip
)
class DhcpTrackerServicer(DhcpTrackerServicer):
def __init__(self, tracker: DhcpdUpdater, *args, **kwargs):
self._tracker = tracker
super().__init__(*args, **kwargs)
def _authorize(self, context):
auth = context.auth_context()
ctype = auth.get('transport_security_type', 'local')
print(ctype)
if ctype == [b'ssl']:
if b'at.hackerspace.pl' not in context.peer_identities():
context.abort(
grpc.StatusCode.PERMISSION_DENIED,
(
"Only at.hackespace.pl is allowed to access raw "
"clients addresses"
)
)
elif ctype == 'local':
# connection from local unix socket is trusted by default
pass
else:
context.abort(
grpc.StatusCode.PERMISSION_DENIED,
f"Unknown transport type: {ctype}"
)
def GetClients(self, request, context):
self._authorize(context)
clients = [
lease_to_client(c) for c in self._tracker.get_active_devices().values()]
return DhcpClients(clients = clients)
def GetHwAddr(self, request, context):
self._authorize(context)
ip_address = str(request.ip_address)
if not re.fullmatch('[0-9a-fA-F:.]*', ip_address):
raise ValueError(f'Invalid ip address: {ip_address!r}')
logging.info(f'running ip neigh on {ip_address}')
r = subprocess.run(['ip', '-json', 'neigh', 'show', ip_address], check=True, capture_output=True)
neighs = json.loads(r.stdout)
if neighs:
return HwAddrResponse(hw_address=bytes.fromhex(neighs[0]['lladdr'].replace(':', '')))
return HwAddrResponse(hw_address=None)
def server():
args = parser.parse_args()
config = yaml.safe_load(args.config.read_text())
if config['DHCP_SERVER'] == "kea":
tracker = KeaUpdater(config['KEA_LEASE_FILE'], config['TIMEOUT'])
elif config['DHCP_SERVER'] == "isc" or "DHCP_SERVER" not in config:
tracker = DhcpdUpdater(config['LEASE_FILE'], config['TIMEOUT'])
tracker.start()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_DhcpTrackerServicer_to_server(DhcpTrackerServicer(tracker), server)
tls_address = config.get("GRPC_TLS_ADDRESS", None)
if tls_address:
cert_dir = Path(config.get('GRPC_TLS_CERT_DIR', 'cert'))
ca_cert = Path(config.get('GRPC_TLS_CA_CERT', 'ca.pem')).read_bytes()
server_credentials = grpc.ssl_server_credentials(
private_key_certificate_chain_pairs = ((
cert_dir.joinpath('key.pem').read_bytes(),
cert_dir.joinpath('cert.pem').read_bytes()
),),
root_certificates = ca_cert,
require_client_auth = True
)
server.add_secure_port(config.get('GRPC_TLS_ADDRESS', '[::]:2847'), server_credentials)
unix_socket = config.get('GRPC_UNIX_SOCKET', False)
if unix_socket:
server.add_insecure_port(f'unix://{unix_socket}')
if tls_address or unix_socket:
print('starting grpc server ...')
server.start()
server.wait_for_termination()