blob: df40dae146315f3c75bfc6c4b4f41be215c96be9 [file] [log] [blame]
import threading
import os
import io
import logging
from typing import Tuple, List, Optional, NamedTuple
from time import sleep, time, mktime
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def strfts(ts, format='%d/%m/%Y %H:%M'):
return datetime.fromtimestamp(ts).strftime(format)
class DhcpLease(NamedTuple):
hwaddr: Optional[str]
atime: Optional[float]
ip: Optional[str]
name: Optional[str]
class ActiveDevices:
def __init__(self):
self._devices = {}
def purge_stale(self, timeout):
now = time()
for device in list(self._devices.values()):
if now - device.atime > timeout:
del self._devices[device.hwaddr]
def add(self, lease: DhcpLease) -> bool:
if lease.atime is None:
lease = lease._replace(atime=time())
if lease.hwaddr not in self._devices or self._devices[lease.hwaddr].atime < lease.atime:
self._devices[lease.hwaddr] = lease
return True
return False
def update(self, devices) -> List[str]:
'''Add entries from another ActiveDevices instance
Args:
devices: list of entries to be added
Returns: list of updated enties
'''
updated = []
for device in devices._devices.values():
if self.add(device):
updated.append(device)
return updated
class Updater(threading.Thread):
def __init__(self, timeout, logger=logger, *a, **kw):
self.timeout = timeout
self.lock = threading.Lock()
self.logger = logger
self.active = ActiveDevices()
threading.Thread.__init__(self, *a, **kw)
self.daemon = True
def get_active_devices(self):
with self.lock:
self.active.purge_stale(self.timeout)
return dict(self.active._devices)
def get_device(self, ip):
with self.lock:
active_devices = iter(self.get_active_devices().values())
for device in active_devices:
if device.ip == ip:
return device.hwaddr, device.name
return None, None
def update(self, devices: ActiveDevices):
for device in devices._devices.values():
with self.lock:
changed = self.active.add(device)
if changed:
self.logger.info('updated %s with atime %s and ip %s',
device.hwaddr, strfts(device.atime), device.ip)
#class CapUpdater(Updater):
# def __init__(self, cap_file, *a, **kw):
# self.cap_file = cap_file
# Updater.__init__(self, *a, **kw)
#
# def run(self):
# while True:
# try:
# with open(self.cap_file, 'r', buffering=0) as f:
# self.logger.info('Updater ready on cap file %s',
# self.cap_file)
# lines = [l.strip() for l in f.read().split('\n')]
# for hwaddr in lines:
# if hwaddr:
# self.update(hwaddr)
# self.logger.warning('Cap file %s closed, reopening',
# self.cap_file)
# except Exception as e:
# self.logger.error('Updater got an exception:\n' +
# traceback.format_exc(e))
# sleep(10.0)
class MtimeUpdater(Updater):
def __init__(self, lease_file, *a, **kw):
self.lease_file = lease_file
self.position = 0
self.last_modified = 0
Updater.__init__(self, *a, **kw)
def file_changed(self, f):
"""Callback on changed lease file
Args:
f: Lease file. File offset can be used to skip already parsed lines.
Returns: New byte offset pointing after last parsed byte.
"""
return f.tell()
def _trigger_update(self):
self.logger.info('Lease file changed, updating')
with open(self.lease_file, 'r') as f:
f.seek(self.position)
self.position = self.file_changed(f)
def run(self):
"""Periodicaly check if file has changed
From ISC DHCPD manual:
New leases are appended to the end of the dhcpd.leases file. In
order to prevent the file from becoming arbitrarily large, from
time to time dhcpd creates a new dhcpd.leases file from its in-core
lease database. Once this file has been written to disk, the old
file is renamed dhcpd.leases~, and the new file is renamed
dhcpd.leases.
Kea documentation[0] suggests identical behavior:
For performance reasons, the server does not update the existing
client's lease in the file, as this would potentially require
rewriting the entire file. Instead, it simply appends the new lease
information to the end of the file; the previous lease entries for
the client are not removed.
[ ... ]
Lease file cleanup is performed by a separate process (in the
background) to avoid a performance impact on the server process. To
avoid conflicts between two processes using the same lease files,
the LFC process starts with Kea opening a new lease file; the actual
LFC process operates on the lease file that is no longer used by the
server. There are also other files created as a side effect of the
lease file cleanup
[0]: https://kea.readthedocs.io/en/latest/arm/dhcp4-srv.html#why-is-lease-file-cleanup-necessary
"""
while True:
try:
stat = os.stat(self.lease_file)
mtime = stat.st_mtime
size = stat.st_size
if size < self.position:
self.logger.info('leases file changed - reseting pointer')
self.position = 0
try:
# checking if DHCPD performed cleanup
# cleanup during operation seems to be currently broken
# on customs so this could never execute
purge_time = os.stat(self.lease_file + '~').st_mtime
if purge_time > self.last_modified:
self.logger.info('leases file purged - reseting pointer')
self.position = 0
except FileNotFoundError:
pass
if mtime > self.last_modified:
self._trigger_update()
self.last_modified = mtime
sleep(5.0)
except Exception:
self.logger.exception('Exception in updater')
sleep(10.0)
class DnsmasqUpdater(MtimeUpdater):
def file_changed(self, f):
raise NotImplementedError(
"This was not tested after adding differential update")
for line in f:
ts, hwaddr, ip, name, client_id = line.split(' ')
self.update(hwaddr, int(ts), ip, name)
return f.tell()
def parse_isc_dhcpd_leases(leases_file: io.TextIOBase) -> Tuple[int, ActiveDevices]:
"""Parse ISC dhcpd server leases file
Args:
leases_file: opened leases file. To skip already parsed part use seek
before calling.
Returns: Byte offset (as returned by tell()) of last parsed entry and
dictionary of parsed leases
"""
leases = ActiveDevices()
ip: Optional[str] = None
hwaddr: Optional[str] = None
atime: Optional[float] = None
name: Optional[str] = None
lease = False
offset = leases_file.tell()
while True:
# using readline because iter(file) blocks file.tell usage
line = leases_file.readline()
if not line:
return offset, leases
line = line.split('#')[0]
cmd = line.strip().split()
if not cmd:
continue
if lease:
field = cmd[0]
if(field == 'starts'):
dt = datetime.strptime(' '.join(cmd[2:]),
'%Y/%m/%d %H:%M:%S;')
atime = dt.replace(tzinfo=timezone.utc).timestamp()
if(field == 'client-hostname'):
name = cmd[1][1:-2]
if(field == 'hardware'):
hwaddr = cmd[2][:-1]
if(field.startswith('}')):
offset = leases_file.tell()
lease = False
if hwaddr is not None and atime is not None:
leases.add(DhcpLease(hwaddr, atime, ip, name))
hwaddr, atime = None, None
elif cmd[0] == 'lease':
ip = cmd[1]
hwaddr, atime, name = None, None, None
lease = True
class DhcpdUpdater(MtimeUpdater):
def file_changed(self, f):
offset, devices = parse_isc_dhcpd_leases(f)
self.update(devices)
return offset
class KeaUpdater(MtimeUpdater):
def file_changed(self, f):
leases = ActiveDevices()
for line in f:
# header line
if(line.startswith('address,')):
continue
# taken directly from kea leases file header
address, hwaddr, client_id, valid_lifetime, expire, subnet_id, fqdn_fwd, fqdn_rev, hostname, state, user_context, pool_id = line.split(',')
leases.add(DhcpLease(hwaddr, int(expire) - int(valid_lifetime), address, hostname))
self.update(leases)
return f.tell()