blob: df40dae146315f3c75bfc6c4b4f41be215c96be9 [file] [log] [blame]
vuko3cd087d2021-12-28 13:19:40 +01001import threading
2import os
3import io
4
5import logging
6
7from typing import Tuple, List, Optional, NamedTuple
8from time import sleep, time, mktime
9from datetime import datetime, timezone
10
11logger = logging.getLogger(__name__)
12
13def strfts(ts, format='%d/%m/%Y %H:%M'):
14 return datetime.fromtimestamp(ts).strftime(format)
15
16class DhcpLease(NamedTuple):
17 hwaddr: Optional[str]
18 atime: Optional[float]
19 ip: Optional[str]
20 name: Optional[str]
21
22
23class ActiveDevices:
24 def __init__(self):
25 self._devices = {}
26
27 def purge_stale(self, timeout):
28 now = time()
29 for device in list(self._devices.values()):
30 if now - device.atime > timeout:
31 del self._devices[device.hwaddr]
32
33 def add(self, lease: DhcpLease) -> bool:
34 if lease.atime is None:
35 lease = lease._replace(atime=time())
36 if lease.hwaddr not in self._devices or self._devices[lease.hwaddr].atime < lease.atime:
37 self._devices[lease.hwaddr] = lease
38 return True
39 return False
40
41 def update(self, devices) -> List[str]:
42 '''Add entries from another ActiveDevices instance
43
44 Args:
45 devices: list of entries to be added
46
47 Returns: list of updated enties
48 '''
49
50 updated = []
51 for device in devices._devices.values():
52 if self.add(device):
53 updated.append(device)
54 return updated
55
56class Updater(threading.Thread):
57 def __init__(self, timeout, logger=logger, *a, **kw):
58 self.timeout = timeout
59 self.lock = threading.Lock()
60 self.logger = logger
61 self.active = ActiveDevices()
62 threading.Thread.__init__(self, *a, **kw)
63 self.daemon = True
64
65 def get_active_devices(self):
66 with self.lock:
67 self.active.purge_stale(self.timeout)
68 return dict(self.active._devices)
69
70 def get_device(self, ip):
71 with self.lock:
72 active_devices = iter(self.get_active_devices().values())
73 for device in active_devices:
74 if device.ip == ip:
75 return device.hwaddr, device.name
76 return None, None
77
78 def update(self, devices: ActiveDevices):
79 for device in devices._devices.values():
80 with self.lock:
81 changed = self.active.add(device)
82 if changed:
83 self.logger.info('updated %s with atime %s and ip %s',
84 device.hwaddr, strfts(device.atime), device.ip)
85
86#class CapUpdater(Updater):
87# def __init__(self, cap_file, *a, **kw):
88# self.cap_file = cap_file
89# Updater.__init__(self, *a, **kw)
90#
91# def run(self):
92# while True:
93# try:
94# with open(self.cap_file, 'r', buffering=0) as f:
95# self.logger.info('Updater ready on cap file %s',
96# self.cap_file)
97# lines = [l.strip() for l in f.read().split('\n')]
98# for hwaddr in lines:
99# if hwaddr:
100# self.update(hwaddr)
101# self.logger.warning('Cap file %s closed, reopening',
102# self.cap_file)
103# except Exception as e:
104# self.logger.error('Updater got an exception:\n' +
105# traceback.format_exc(e))
106# sleep(10.0)
107
108
109class MtimeUpdater(Updater):
110 def __init__(self, lease_file, *a, **kw):
111 self.lease_file = lease_file
112 self.position = 0
113 self.last_modified = 0
114 Updater.__init__(self, *a, **kw)
115
116 def file_changed(self, f):
117 """Callback on changed lease file
118
119 Args:
120 f: Lease file. File offset can be used to skip already parsed lines.
121
122 Returns: New byte offset pointing after last parsed byte.
123 """
124 return f.tell()
125
126 def _trigger_update(self):
127 self.logger.info('Lease file changed, updating')
128 with open(self.lease_file, 'r') as f:
129 f.seek(self.position)
130 self.position = self.file_changed(f)
131
132 def run(self):
133 """Periodicaly check if file has changed
134
135 From ISC DHCPD manual:
136
137 New leases are appended to the end of the dhcpd.leases file. In
138 order to prevent the file from becoming arbitrarily large, from
139 time to time dhcpd creates a new dhcpd.leases file from its in-core
140 lease database. Once this file has been written to disk, the old
141 file is renamed dhcpd.leases~, and the new file is renamed
142 dhcpd.leases.
Ari Gerus1bcaa972024-03-19 18:35:00 +0100143
144 Kea documentation[0] suggests identical behavior:
145
146 For performance reasons, the server does not update the existing
147 client's lease in the file, as this would potentially require
148 rewriting the entire file. Instead, it simply appends the new lease
149 information to the end of the file; the previous lease entries for
150 the client are not removed.
151
152 [ ... ]
153
154 Lease file cleanup is performed by a separate process (in the
155 background) to avoid a performance impact on the server process. To
156 avoid conflicts between two processes using the same lease files,
157 the LFC process starts with Kea opening a new lease file; the actual
158 LFC process operates on the lease file that is no longer used by the
159 server. There are also other files created as a side effect of the
160 lease file cleanup
161
162 [0]: https://kea.readthedocs.io/en/latest/arm/dhcp4-srv.html#why-is-lease-file-cleanup-necessary
vuko3cd087d2021-12-28 13:19:40 +0100163 """
164 while True:
165 try:
166 stat = os.stat(self.lease_file)
167 mtime = stat.st_mtime
168 size = stat.st_size
169 if size < self.position:
170 self.logger.info('leases file changed - reseting pointer')
171 self.position = 0
172 try:
173 # checking if DHCPD performed cleanup
174 # cleanup during operation seems to be currently broken
175 # on customs so this could never execute
176 purge_time = os.stat(self.lease_file + '~').st_mtime
177 if purge_time > self.last_modified:
178 self.logger.info('leases file purged - reseting pointer')
179 self.position = 0
180 except FileNotFoundError:
181 pass
182 if mtime > self.last_modified:
183 self._trigger_update()
184 self.last_modified = mtime
185 sleep(5.0)
186 except Exception:
187 self.logger.exception('Exception in updater')
188 sleep(10.0)
189
190
191class DnsmasqUpdater(MtimeUpdater):
192 def file_changed(self, f):
193 raise NotImplementedError(
194 "This was not tested after adding differential update")
195 for line in f:
196 ts, hwaddr, ip, name, client_id = line.split(' ')
197 self.update(hwaddr, int(ts), ip, name)
198 return f.tell()
199
200def parse_isc_dhcpd_leases(leases_file: io.TextIOBase) -> Tuple[int, ActiveDevices]:
201 """Parse ISC dhcpd server leases file
202
203 Args:
204 leases_file: opened leases file. To skip already parsed part use seek
205 before calling.
206
207 Returns: Byte offset (as returned by tell()) of last parsed entry and
208 dictionary of parsed leases
209 """
210 leases = ActiveDevices()
211
212 ip: Optional[str] = None
213 hwaddr: Optional[str] = None
214 atime: Optional[float] = None
215 name: Optional[str] = None
216
217 lease = False
218 offset = leases_file.tell()
219 while True:
220 # using readline because iter(file) blocks file.tell usage
221 line = leases_file.readline()
222 if not line:
223 return offset, leases
224 line = line.split('#')[0]
225 cmd = line.strip().split()
226 if not cmd:
227 continue
228 if lease:
229 field = cmd[0]
230 if(field == 'starts'):
231 dt = datetime.strptime(' '.join(cmd[2:]),
232 '%Y/%m/%d %H:%M:%S;')
233 atime = dt.replace(tzinfo=timezone.utc).timestamp()
234 if(field == 'client-hostname'):
235 name = cmd[1][1:-2]
236 if(field == 'hardware'):
237 hwaddr = cmd[2][:-1]
238 if(field.startswith('}')):
239 offset = leases_file.tell()
240 lease = False
241 if hwaddr is not None and atime is not None:
242 leases.add(DhcpLease(hwaddr, atime, ip, name))
243 hwaddr, atime = None, None
244 elif cmd[0] == 'lease':
245 ip = cmd[1]
246 hwaddr, atime, name = None, None, None
247 lease = True
248
249
250class DhcpdUpdater(MtimeUpdater):
251 def file_changed(self, f):
252 offset, devices = parse_isc_dhcpd_leases(f)
253 self.update(devices)
254 return offset
Ari Gerus1bcaa972024-03-19 18:35:00 +0100255
256class KeaUpdater(MtimeUpdater):
257 def file_changed(self, f):
258 leases = ActiveDevices()
259 for line in f:
260 # header line
261 if(line.startswith('address,')):
262 continue
263 # taken directly from kea leases file header
264 address, hwaddr, client_id, valid_lifetime, expire, subnet_id, fqdn_fwd, fqdn_rev, hostname, state, user_context, pool_id = line.split(',')
265 leases.add(DhcpLease(hwaddr, int(expire) - int(valid_lifetime), address, hostname))
266 self.update(leases)
267 return f.tell()