| import threading |
| import os |
| import io |
| |
| import logging |
| |
| from typing import Tuple, List, Optional, NamedTuple |
| from time import sleep, time, mktime |
| from datetime import datetime, timezone |
| |
| logger = logging.getLogger(__name__) |
| |
| def strfts(ts, format='%d/%m/%Y %H:%M'): |
| return datetime.fromtimestamp(ts).strftime(format) |
| |
| class DhcpLease(NamedTuple): |
| hwaddr: Optional[str] |
| atime: Optional[float] |
| ip: Optional[str] |
| name: Optional[str] |
| |
| |
| class ActiveDevices: |
| def __init__(self): |
| self._devices = {} |
| |
| def purge_stale(self, timeout): |
| now = time() |
| for device in list(self._devices.values()): |
| if now - device.atime > timeout: |
| del self._devices[device.hwaddr] |
| |
| def add(self, lease: DhcpLease) -> bool: |
| if lease.atime is None: |
| lease = lease._replace(atime=time()) |
| if lease.hwaddr not in self._devices or self._devices[lease.hwaddr].atime < lease.atime: |
| self._devices[lease.hwaddr] = lease |
| return True |
| return False |
| |
| def update(self, devices) -> List[str]: |
| '''Add entries from another ActiveDevices instance |
| |
| Args: |
| devices: list of entries to be added |
| |
| Returns: list of updated enties |
| ''' |
| |
| updated = [] |
| for device in devices._devices.values(): |
| if self.add(device): |
| updated.append(device) |
| return updated |
| |
| class Updater(threading.Thread): |
| def __init__(self, timeout, logger=logger, *a, **kw): |
| self.timeout = timeout |
| self.lock = threading.Lock() |
| self.logger = logger |
| self.active = ActiveDevices() |
| threading.Thread.__init__(self, *a, **kw) |
| self.daemon = True |
| |
| def get_active_devices(self): |
| with self.lock: |
| self.active.purge_stale(self.timeout) |
| return dict(self.active._devices) |
| |
| def get_device(self, ip): |
| with self.lock: |
| active_devices = iter(self.get_active_devices().values()) |
| for device in active_devices: |
| if device.ip == ip: |
| return device.hwaddr, device.name |
| return None, None |
| |
| def update(self, devices: ActiveDevices): |
| for device in devices._devices.values(): |
| with self.lock: |
| changed = self.active.add(device) |
| if changed: |
| self.logger.info('updated %s with atime %s and ip %s', |
| device.hwaddr, strfts(device.atime), device.ip) |
| |
| #class CapUpdater(Updater): |
| # def __init__(self, cap_file, *a, **kw): |
| # self.cap_file = cap_file |
| # Updater.__init__(self, *a, **kw) |
| # |
| # def run(self): |
| # while True: |
| # try: |
| # with open(self.cap_file, 'r', buffering=0) as f: |
| # self.logger.info('Updater ready on cap file %s', |
| # self.cap_file) |
| # lines = [l.strip() for l in f.read().split('\n')] |
| # for hwaddr in lines: |
| # if hwaddr: |
| # self.update(hwaddr) |
| # self.logger.warning('Cap file %s closed, reopening', |
| # self.cap_file) |
| # except Exception as e: |
| # self.logger.error('Updater got an exception:\n' + |
| # traceback.format_exc(e)) |
| # sleep(10.0) |
| |
| |
| class MtimeUpdater(Updater): |
| def __init__(self, lease_file, *a, **kw): |
| self.lease_file = lease_file |
| self.position = 0 |
| self.last_modified = 0 |
| Updater.__init__(self, *a, **kw) |
| |
| def file_changed(self, f): |
| """Callback on changed lease file |
| |
| Args: |
| f: Lease file. File offset can be used to skip already parsed lines. |
| |
| Returns: New byte offset pointing after last parsed byte. |
| """ |
| return f.tell() |
| |
| def _trigger_update(self): |
| self.logger.info('Lease file changed, updating') |
| with open(self.lease_file, 'r') as f: |
| f.seek(self.position) |
| self.position = self.file_changed(f) |
| |
| def run(self): |
| """Periodicaly check if file has changed |
| |
| From ISC DHCPD manual: |
| |
| New leases are appended to the end of the dhcpd.leases file. In |
| order to prevent the file from becoming arbitrarily large, from |
| time to time dhcpd creates a new dhcpd.leases file from its in-core |
| lease database. Once this file has been written to disk, the old |
| file is renamed dhcpd.leases~, and the new file is renamed |
| dhcpd.leases. |
| """ |
| while True: |
| try: |
| stat = os.stat(self.lease_file) |
| mtime = stat.st_mtime |
| size = stat.st_size |
| if size < self.position: |
| self.logger.info('leases file changed - reseting pointer') |
| self.position = 0 |
| try: |
| # checking if DHCPD performed cleanup |
| # cleanup during operation seems to be currently broken |
| # on customs so this could never execute |
| purge_time = os.stat(self.lease_file + '~').st_mtime |
| if purge_time > self.last_modified: |
| self.logger.info('leases file purged - reseting pointer') |
| self.position = 0 |
| except FileNotFoundError: |
| pass |
| if mtime > self.last_modified: |
| self._trigger_update() |
| self.last_modified = mtime |
| sleep(5.0) |
| except Exception: |
| self.logger.exception('Exception in updater') |
| sleep(10.0) |
| |
| |
| class DnsmasqUpdater(MtimeUpdater): |
| def file_changed(self, f): |
| raise NotImplementedError( |
| "This was not tested after adding differential update") |
| for line in f: |
| ts, hwaddr, ip, name, client_id = line.split(' ') |
| self.update(hwaddr, int(ts), ip, name) |
| return f.tell() |
| |
| def parse_isc_dhcpd_leases(leases_file: io.TextIOBase) -> Tuple[int, ActiveDevices]: |
| """Parse ISC dhcpd server leases file |
| |
| Args: |
| leases_file: opened leases file. To skip already parsed part use seek |
| before calling. |
| |
| Returns: Byte offset (as returned by tell()) of last parsed entry and |
| dictionary of parsed leases |
| """ |
| leases = ActiveDevices() |
| |
| ip: Optional[str] = None |
| hwaddr: Optional[str] = None |
| atime: Optional[float] = None |
| name: Optional[str] = None |
| |
| lease = False |
| offset = leases_file.tell() |
| while True: |
| # using readline because iter(file) blocks file.tell usage |
| line = leases_file.readline() |
| if not line: |
| return offset, leases |
| line = line.split('#')[0] |
| cmd = line.strip().split() |
| if not cmd: |
| continue |
| if lease: |
| field = cmd[0] |
| if(field == 'starts'): |
| dt = datetime.strptime(' '.join(cmd[2:]), |
| '%Y/%m/%d %H:%M:%S;') |
| atime = dt.replace(tzinfo=timezone.utc).timestamp() |
| if(field == 'client-hostname'): |
| name = cmd[1][1:-2] |
| if(field == 'hardware'): |
| hwaddr = cmd[2][:-1] |
| if(field.startswith('}')): |
| offset = leases_file.tell() |
| lease = False |
| if hwaddr is not None and atime is not None: |
| leases.add(DhcpLease(hwaddr, atime, ip, name)) |
| hwaddr, atime = None, None |
| elif cmd[0] == 'lease': |
| ip = cmd[1] |
| hwaddr, atime, name = None, None, None |
| lease = True |
| |
| |
| class DhcpdUpdater(MtimeUpdater): |
| def file_changed(self, f): |
| offset, devices = parse_isc_dhcpd_leases(f) |
| self.update(devices) |
| return offset |