vuko | 3cd087d | 2021-12-28 13:19:40 +0100 | [diff] [blame] | 1 | import argparse |
| 2 | from pathlib import Path |
| 3 | from at.dhcp import parse_isc_dhcpd_leases |
| 4 | from time import time |
| 5 | |
| 6 | def list(): |
| 7 | parser = argparse.ArgumentParser() |
| 8 | parser.add_argument("leases", type=Path, help="leases file") |
| 9 | parser.add_argument("--timeout", type=int, default=None, help="timeout in minutes") |
| 10 | args = parser.parse_args() |
| 11 | |
| 12 | with open(args.leases) as f: |
| 13 | offset, devices = parse_isc_dhcpd_leases(f) |
| 14 | if args.timeout is not None: |
| 15 | devices.purge_stale(args.timeout * 60) |
| 16 | print("Found devices:") |
| 17 | for device in devices._devices.values(): |
| 18 | print(device._replace(atime = time() - device.atime)) |
| 19 | |
| 20 | import grpc |
| 21 | from .tracker_pb2 import ClientsRequest |
| 22 | from .tracker_pb2_grpc import DhcpTrackerStub |
| 23 | |
| 24 | def format_mac(raw: bytes) -> str: |
| 25 | return ':'.join(f'{b:02x}' for b in raw) |
| 26 | |
| 27 | def tracker_list(): |
| 28 | parser = argparse.ArgumentParser() |
| 29 | parser.add_argument("address", default='unix:///tmp/checkinator.sock', nargs='?', help="tracker grpc address") |
| 30 | parser.add_argument("--timeout", type=int, default=None, help="timeout in minutes") |
| 31 | args = parser.parse_args() |
| 32 | with grpc.insecure_channel(args.address) as channel: |
| 33 | stub = DhcpTrackerStub(channel) |
| 34 | response = stub.GetClients(ClientsRequest()) |
| 35 | for client in response.clients: |
| 36 | print(format_mac(client.hw_address), client.last_seen, client.ip_address, client.client_hostname ) |