blob: bee2ace29b9e53d6b7705e53a0a22c8f4188e8a2 [file] [log] [blame]
import threading
import os
import io
import logging
from typing import Tuple, List, Optional, NamedTuple
from time import sleep, time, mktime
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def strfts(ts, format='%d/%m/%Y %H:%M'):
return datetime.fromtimestamp(ts).strftime(format)
class DhcpLease(NamedTuple):
hwaddr: Optional[str]
atime: Optional[float]
ip: Optional[str]
name: Optional[str]
class ActiveDevices:
def __init__(self):
self._devices = {}
def purge_stale(self, timeout):
now = time()
for device in list(self._devices.values()):
if now - device.atime > timeout:
del self._devices[device.hwaddr]
def add(self, lease: DhcpLease) -> bool:
if lease.atime is None:
lease = lease._replace(atime=time())
if lease.hwaddr not in self._devices or self._devices[lease.hwaddr].atime < lease.atime:
self._devices[lease.hwaddr] = lease
return True
return False
def update(self, devices) -> List[str]:
'''Add entries from another ActiveDevices instance
Args:
devices: list of entries to be added
Returns: list of updated enties
'''
updated = []
for device in devices._devices.values():
if self.add(device):
updated.append(device)
return updated
class Updater(threading.Thread):
def __init__(self, timeout, logger=logger, *a, **kw):
self.timeout = timeout
self.lock = threading.Lock()
self.logger = logger
self.active = ActiveDevices()
threading.Thread.__init__(self, *a, **kw)
self.daemon = True
def get_active_devices(self):
with self.lock:
self.active.purge_stale(self.timeout)
return dict(self.active._devices)
def get_device(self, ip):
with self.lock:
active_devices = iter(self.get_active_devices().values())
for device in active_devices:
if device.ip == ip:
return device.hwaddr, device.name
return None, None
def update(self, devices: ActiveDevices):
for device in devices._devices.values():
with self.lock:
changed = self.active.add(device)
if changed:
self.logger.info('updated %s with atime %s and ip %s',
device.hwaddr, strfts(device.atime), device.ip)
#class CapUpdater(Updater):
# def __init__(self, cap_file, *a, **kw):
# self.cap_file = cap_file
# Updater.__init__(self, *a, **kw)
#
# def run(self):
# while True:
# try:
# with open(self.cap_file, 'r', buffering=0) as f:
# self.logger.info('Updater ready on cap file %s',
# self.cap_file)
# lines = [l.strip() for l in f.read().split('\n')]
# for hwaddr in lines:
# if hwaddr:
# self.update(hwaddr)
# self.logger.warning('Cap file %s closed, reopening',
# self.cap_file)
# except Exception as e:
# self.logger.error('Updater got an exception:\n' +
# traceback.format_exc(e))
# sleep(10.0)
class MtimeUpdater(Updater):
def __init__(self, lease_file, *a, **kw):
self.lease_file = lease_file
self.position = 0
self.last_modified = 0
Updater.__init__(self, *a, **kw)
def file_changed(self, f):
"""Callback on changed lease file
Args:
f: Lease file. File offset can be used to skip already parsed lines.
Returns: New byte offset pointing after last parsed byte.
"""
return f.tell()
def _trigger_update(self):
self.logger.info('Lease file changed, updating')
with open(self.lease_file, 'r') as f:
f.seek(self.position)
self.position = self.file_changed(f)
def run(self):
"""Periodicaly check if file has changed
From ISC DHCPD manual:
New leases are appended to the end of the dhcpd.leases file. In
order to prevent the file from becoming arbitrarily large, from
time to time dhcpd creates a new dhcpd.leases file from its in-core
lease database. Once this file has been written to disk, the old
file is renamed dhcpd.leases~, and the new file is renamed
dhcpd.leases.
"""
while True:
try:
stat = os.stat(self.lease_file)
mtime = stat.st_mtime
size = stat.st_size
if size < self.position:
self.logger.info('leases file changed - reseting pointer')
self.position = 0
try:
# checking if DHCPD performed cleanup
# cleanup during operation seems to be currently broken
# on customs so this could never execute
purge_time = os.stat(self.lease_file + '~').st_mtime
if purge_time > self.last_modified:
self.logger.info('leases file purged - reseting pointer')
self.position = 0
except FileNotFoundError:
pass
if mtime > self.last_modified:
self._trigger_update()
self.last_modified = mtime
sleep(5.0)
except Exception:
self.logger.exception('Exception in updater')
sleep(10.0)
class DnsmasqUpdater(MtimeUpdater):
def file_changed(self, f):
raise NotImplementedError(
"This was not tested after adding differential update")
for line in f:
ts, hwaddr, ip, name, client_id = line.split(' ')
self.update(hwaddr, int(ts), ip, name)
return f.tell()
def parse_isc_dhcpd_leases(leases_file: io.TextIOBase) -> Tuple[int, ActiveDevices]:
"""Parse ISC dhcpd server leases file
Args:
leases_file: opened leases file. To skip already parsed part use seek
before calling.
Returns: Byte offset (as returned by tell()) of last parsed entry and
dictionary of parsed leases
"""
leases = ActiveDevices()
ip: Optional[str] = None
hwaddr: Optional[str] = None
atime: Optional[float] = None
name: Optional[str] = None
lease = False
offset = leases_file.tell()
while True:
# using readline because iter(file) blocks file.tell usage
line = leases_file.readline()
if not line:
return offset, leases
line = line.split('#')[0]
cmd = line.strip().split()
if not cmd:
continue
if lease:
field = cmd[0]
if(field == 'starts'):
dt = datetime.strptime(' '.join(cmd[2:]),
'%Y/%m/%d %H:%M:%S;')
atime = dt.replace(tzinfo=timezone.utc).timestamp()
if(field == 'client-hostname'):
name = cmd[1][1:-2]
if(field == 'hardware'):
hwaddr = cmd[2][:-1]
if(field.startswith('}')):
offset = leases_file.tell()
lease = False
if hwaddr is not None and atime is not None:
leases.add(DhcpLease(hwaddr, atime, ip, name))
hwaddr, atime = None, None
elif cmd[0] == 'lease':
ip = cmd[1]
hwaddr, atime, name = None, None, None
lease = True
class DhcpdUpdater(MtimeUpdater):
def file_changed(self, f):
offset, devices = parse_isc_dhcpd_leases(f)
self.update(devices)
return offset