anabasis-clients/generate.py
2021-02-11 02:14:06 +01:00

222 lines
8.2 KiB
Python

import csv
import logging
import os
import re
import sqlite3
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import groupby
from operator import attrgetter
from time import sleep
from typing import List, Optional
import click
import humanize
import routeros_api
from jinja2 import Environment, select_autoescape, FileSystemLoader
from config import Config
config = Config
@dataclass
class Lease:
ts: datetime
mac: str
hostname: Optional[str]
ip: str
@property
def display(self):
return self.hostname or self.mac
@dataclass
class Status:
open: bool
description: str
text: str
def _get_db(filepath: str):
logging.debug(f"Opening database: {filepath}")
connection = sqlite3.connect(filepath)
c = connection.cursor()
c.execute(
'''CREATE TABLE IF NOT EXISTS spottings (
id INTEGER PRIMARY KEY,
ts INTEGER,
mac TEXT,
hostname TEXT,
ip TEXT
)'''
)
connection.commit()
return connection
def _fetch_leases(db, from_ts: datetime):
output = []
c = db.cursor()
query = c.execute(
'SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC',
(int(from_ts.timestamp()),)
)
for row in query:
output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]))
return output
def _is_human(lease: Lease):
if lease.hostname:
return not any(re.match(ch, lease.hostname) for ch in config.computer_hostnames)
else:
return True
@click.command()
@click.option('--address', default=os.getenv("ROUTER_IP", "192.168.42.1"), help='Address of the router.')
@click.option('--period', default=60, help='How often to check for clients (in seconds).')
@click.option('--ssid', default=os.getenv("SSID", "anabasis"), help='Limit clients to SSID containing this string.')
@click.option('-o', '--output', multiple=True, help="Output file.")
def run_forever(address: str, period: int, ssid: str, output: str):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - [%(levelname)s] %(message)s')
db = _get_db("clients.sqlite3")
connection = routeros_api.RouterOsApiPool(
address,
username='admin',
password=os.environ["ROUTER_PASSWORD"],
plaintext_login=True
)
jinja_env = Environment(
loader=FileSystemLoader('templates'),
autoescape=select_autoescape(['html', 'xml'])
)
while True:
logging.info(f"Querying router at {address}...")
api = connection.get_api()
currently_registered = api.get_resource('/caps-man/registration-table').call('print')
logging.debug(f"Got {len(currently_registered)} registered clients.")
dhcp_leases = api.get_resource('/ip/dhcp-server/lease').call('print')
logging.debug(f"Got {len(dhcp_leases)} DHCP leases.")
now = datetime.now()
timestamp = int(now.timestamp())
registered_leases: List[Lease] = []
for client in filter(lambda c: ssid in c['ssid'], currently_registered):
try:
lease = next(
lease for lease in dhcp_leases if lease.get('active-mac-address') == client['mac-address']
)
except StopIteration:
continue
registered_leases.append(
Lease(ts=now, ip=lease['active-address'], mac=lease['active-mac-address'],
hostname=lease.get('host-name'))
)
registered_leases.sort(key=lambda l: (l.hostname or "").lower())
registered_leases.sort(key=lambda l: not bool(l.hostname))
logging.info(f"Found {len(registered_leases)} registered leases.")
logging.debug(", ".join([str(lease) for lease in registered_leases]))
if len(registered_leases) > 0:
people_cnt = len([lease for lease in registered_leases if _is_human(lease)])
if people_cnt > 3:
status = Status(open=True, description='FILLED', text="There seems to be a lot of people!")
elif people_cnt > 0:
status = Status(open=True, description='POPULATED', text="There seem to be people!")
else:
status = Status(open=False, description='EMPTY', text="There are only computers.")
else:
status = Status(open=False, description='VOID', text="There are no devices connected?")
logging.debug("Logging into the database...")
cur = db.cursor()
cur.executemany("INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)", [
(timestamp, lease.mac, lease.hostname, lease.ip) for lease in registered_leases
])
db.commit()
for output_file in output:
if output_file.endswith(".csv"):
logging.debug(f"Outputting CSV file into {output_file}...")
with open(output_file, 'w') as file:
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".lst"):
logging.debug(f"Outputting LST file into {output_file}...")
with open(output_file, 'w') as file:
file.write(f"{now}\n")
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".html"):
last_change = None
for ts, leases in groupby(_fetch_leases(db, now - timedelta(days=7)), key=attrgetter('ts')):
humans_present = [lease for lease in leases if _is_human(lease)]
if (len(humans_present) > 0) != status.open:
last_change = {'ts': ts, 'leases': humans_present}
break
log_entry = namedtuple('log_entry', ('ts', 'state', 'lease'))
log = []
last_seen = []
for ts, leases in groupby(reversed(_fetch_leases(db, now - timedelta(days=1))), key=attrgetter('ts')):
leases = list(leases)
for lease in leases:
if lease.mac not in (l.mac for l in last_seen):
log.append(log_entry(ts, True, lease))
for lease in last_seen:
if lease.mac not in (l.mac for l in leases):
log.append(log_entry(ts, False, lease))
last_seen = leases
log.reverse()
leaderboard_tmp = {}
mac_to_hostname = {}
for lease in _fetch_leases(db, now - timedelta(days=120)):
if _is_human(lease):
leaderboard_tmp.setdefault(lease.mac, 0)
leaderboard_tmp[lease.mac] += 1
mac_to_hostname.setdefault(lease.mac, lease.hostname)
leaderboard_entry = namedtuple('leaderboard_entry', ('name', 'total'))
leaderboard = []
for mac, minutes in leaderboard_tmp.items():
leaderboard.append(
leaderboard_entry(
mac_to_hostname.get(mac) or mac,
humanize.naturaldelta(timedelta(minutes=minutes))
)
)
leaderboard.sort(key=lambda l: l.total, reverse=True)
leaderboard = leaderboard[:10]
logging.debug(f"Outputting HTML file into {output_file}...")
with open(output_file, 'w') as file:
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
status=status,
last_change=last_change,
log=log,
leaderboard=leaderboard
)
file.write(out_str)
logging.info(f"Sleeping for {period} seconds.")
sleep(period)
if __name__ == '__main__':
run_forever()