blob: bee2ace29b9e53d6b7705e53a0a22c8f4188e8a2 [file] [log] [blame]
vuko3cd087d2021-12-28 13:19:40 +01001import threading
2import os
3import io
4
5import logging
6
7from typing import Tuple, List, Optional, NamedTuple
8from time import sleep, time, mktime
9from datetime import datetime, timezone
10
11logger = logging.getLogger(__name__)
12
13def strfts(ts, format='%d/%m/%Y %H:%M'):
14 return datetime.fromtimestamp(ts).strftime(format)
15
16class DhcpLease(NamedTuple):
17 hwaddr: Optional[str]
18 atime: Optional[float]
19 ip: Optional[str]
20 name: Optional[str]
21
22
23class ActiveDevices:
24 def __init__(self):
25 self._devices = {}
26
27 def purge_stale(self, timeout):
28 now = time()
29 for device in list(self._devices.values()):
30 if now - device.atime > timeout:
31 del self._devices[device.hwaddr]
32
33 def add(self, lease: DhcpLease) -> bool:
34 if lease.atime is None:
35 lease = lease._replace(atime=time())
36 if lease.hwaddr not in self._devices or self._devices[lease.hwaddr].atime < lease.atime:
37 self._devices[lease.hwaddr] = lease
38 return True
39 return False
40
41 def update(self, devices) -> List[str]:
42 '''Add entries from another ActiveDevices instance
43
44 Args:
45 devices: list of entries to be added
46
47 Returns: list of updated enties
48 '''
49
50 updated = []
51 for device in devices._devices.values():
52 if self.add(device):
53 updated.append(device)
54 return updated
55
56class Updater(threading.Thread):
57 def __init__(self, timeout, logger=logger, *a, **kw):
58 self.timeout = timeout
59 self.lock = threading.Lock()
60 self.logger = logger
61 self.active = ActiveDevices()
62 threading.Thread.__init__(self, *a, **kw)
63 self.daemon = True
64
65 def get_active_devices(self):
66 with self.lock:
67 self.active.purge_stale(self.timeout)
68 return dict(self.active._devices)
69
70 def get_device(self, ip):
71 with self.lock:
72 active_devices = iter(self.get_active_devices().values())
73 for device in active_devices:
74 if device.ip == ip:
75 return device.hwaddr, device.name
76 return None, None
77
78 def update(self, devices: ActiveDevices):
79 for device in devices._devices.values():
80 with self.lock:
81 changed = self.active.add(device)
82 if changed:
83 self.logger.info('updated %s with atime %s and ip %s',
84 device.hwaddr, strfts(device.atime), device.ip)
85
86#class CapUpdater(Updater):
87# def __init__(self, cap_file, *a, **kw):
88# self.cap_file = cap_file
89# Updater.__init__(self, *a, **kw)
90#
91# def run(self):
92# while True:
93# try:
94# with open(self.cap_file, 'r', buffering=0) as f:
95# self.logger.info('Updater ready on cap file %s',
96# self.cap_file)
97# lines = [l.strip() for l in f.read().split('\n')]
98# for hwaddr in lines:
99# if hwaddr:
100# self.update(hwaddr)
101# self.logger.warning('Cap file %s closed, reopening',
102# self.cap_file)
103# except Exception as e:
104# self.logger.error('Updater got an exception:\n' +
105# traceback.format_exc(e))
106# sleep(10.0)
107
108
109class MtimeUpdater(Updater):
110 def __init__(self, lease_file, *a, **kw):
111 self.lease_file = lease_file
112 self.position = 0
113 self.last_modified = 0
114 Updater.__init__(self, *a, **kw)
115
116 def file_changed(self, f):
117 """Callback on changed lease file
118
119 Args:
120 f: Lease file. File offset can be used to skip already parsed lines.
121
122 Returns: New byte offset pointing after last parsed byte.
123 """
124 return f.tell()
125
126 def _trigger_update(self):
127 self.logger.info('Lease file changed, updating')
128 with open(self.lease_file, 'r') as f:
129 f.seek(self.position)
130 self.position = self.file_changed(f)
131
132 def run(self):
133 """Periodicaly check if file has changed
134
135 From ISC DHCPD manual:
136
137 New leases are appended to the end of the dhcpd.leases file. In
138 order to prevent the file from becoming arbitrarily large, from
139 time to time dhcpd creates a new dhcpd.leases file from its in-core
140 lease database. Once this file has been written to disk, the old
141 file is renamed dhcpd.leases~, and the new file is renamed
142 dhcpd.leases.
143 """
144 while True:
145 try:
146 stat = os.stat(self.lease_file)
147 mtime = stat.st_mtime
148 size = stat.st_size
149 if size < self.position:
150 self.logger.info('leases file changed - reseting pointer')
151 self.position = 0
152 try:
153 # checking if DHCPD performed cleanup
154 # cleanup during operation seems to be currently broken
155 # on customs so this could never execute
156 purge_time = os.stat(self.lease_file + '~').st_mtime
157 if purge_time > self.last_modified:
158 self.logger.info('leases file purged - reseting pointer')
159 self.position = 0
160 except FileNotFoundError:
161 pass
162 if mtime > self.last_modified:
163 self._trigger_update()
164 self.last_modified = mtime
165 sleep(5.0)
166 except Exception:
167 self.logger.exception('Exception in updater')
168 sleep(10.0)
169
170
171class DnsmasqUpdater(MtimeUpdater):
172 def file_changed(self, f):
173 raise NotImplementedError(
174 "This was not tested after adding differential update")
175 for line in f:
176 ts, hwaddr, ip, name, client_id = line.split(' ')
177 self.update(hwaddr, int(ts), ip, name)
178 return f.tell()
179
180def parse_isc_dhcpd_leases(leases_file: io.TextIOBase) -> Tuple[int, ActiveDevices]:
181 """Parse ISC dhcpd server leases file
182
183 Args:
184 leases_file: opened leases file. To skip already parsed part use seek
185 before calling.
186
187 Returns: Byte offset (as returned by tell()) of last parsed entry and
188 dictionary of parsed leases
189 """
190 leases = ActiveDevices()
191
192 ip: Optional[str] = None
193 hwaddr: Optional[str] = None
194 atime: Optional[float] = None
195 name: Optional[str] = None
196
197 lease = False
198 offset = leases_file.tell()
199 while True:
200 # using readline because iter(file) blocks file.tell usage
201 line = leases_file.readline()
202 if not line:
203 return offset, leases
204 line = line.split('#')[0]
205 cmd = line.strip().split()
206 if not cmd:
207 continue
208 if lease:
209 field = cmd[0]
210 if(field == 'starts'):
211 dt = datetime.strptime(' '.join(cmd[2:]),
212 '%Y/%m/%d %H:%M:%S;')
213 atime = dt.replace(tzinfo=timezone.utc).timestamp()
214 if(field == 'client-hostname'):
215 name = cmd[1][1:-2]
216 if(field == 'hardware'):
217 hwaddr = cmd[2][:-1]
218 if(field.startswith('}')):
219 offset = leases_file.tell()
220 lease = False
221 if hwaddr is not None and atime is not None:
222 leases.add(DhcpLease(hwaddr, atime, ip, name))
223 hwaddr, atime = None, None
224 elif cmd[0] == 'lease':
225 ip = cmd[1]
226 hwaddr, atime, name = None, None, None
227 lease = True
228
229
230class DhcpdUpdater(MtimeUpdater):
231 def file_changed(self, f):
232 offset, devices = parse_isc_dhcpd_leases(f)
233 self.update(devices)
234 return offset