anabasis-clients/generate.py

328 lines
11 KiB
Python

import csv
import logging
import os
import re
import sqlite3
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import groupby
from operator import attrgetter
from time import sleep
from typing import List, Optional
import click
import humanize
import requests
import routeros_api
from jinja2 import Environment, select_autoescape, FileSystemLoader
from config import Config
config = Config
@dataclass
class Lease:
ts: datetime
mac: str
hostname: Optional[str]
ip: str
@property
def display(self):
return self.hostname or self.mac
@property
def is_human(self):
if self.hostname:
return not any(
re.match(ch, self.hostname) for ch in config.computer_hostnames
)
else:
return True
@dataclass
class Status:
level: int
description: str
text: str
def _get_db(filepath: str):
logging.debug(f"Opening database: {filepath}")
connection = sqlite3.connect(filepath)
c = connection.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS spottings (
id INTEGER PRIMARY KEY,
ts INTEGER,
mac TEXT,
hostname TEXT,
ip TEXT
)"""
)
connection.commit()
return connection
def _fetch_leases(db, from_ts: datetime):
output = []
c = db.cursor()
query = c.execute(
"SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC",
(int(from_ts.timestamp()),),
)
for row in query:
output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]))
return output
@click.command()
@click.option(
"--address",
default=os.getenv("ROUTER_IP", "192.168.42.1"),
help="Address of the router.",
)
@click.option(
"--period", default=60, help="How often to check for clients (in seconds)."
)
@click.option(
"--ssid",
default=os.getenv("SSID", "anabasis"),
help="Limit clients to SSID containing this string.",
)
@click.option("--webhook-url", help="Webhook URL to post status changes to.")
@click.option("-o", "--output", multiple=True, help="Output file.")
@click.option("--output-internal", help="Output file (with internal stats).")
def run_forever(
address: str,
period: int,
ssid: str,
output: str,
output_internal: Optional[str],
webhook_url: Optional[str],
):
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
db = _get_db("clients.sqlite3")
connection = routeros_api.RouterOsApiPool(
address,
username="admin",
password=os.environ["ROUTER_PASSWORD"],
plaintext_login=True,
)
jinja_env = Environment(
loader=FileSystemLoader("templates"),
autoescape=select_autoescape(["html", "xml"]),
)
last_status, last_status_change = None, datetime.now()
while True:
logging.info(f"Querying router at {address}...")
api = connection.get_api()
currently_registered = api.get_resource("/caps-man/registration-table").call(
"print"
)
logging.debug(f"Got {len(currently_registered)} registered clients.")
dhcp_leases = api.get_resource("/ip/dhcp-server/lease").call("print")
logging.debug(f"Got {len(dhcp_leases)} DHCP leases.")
now = datetime.now()
timestamp = int(now.timestamp())
registered_leases: List[Lease] = []
for client in filter(lambda c: ssid in c["ssid"], currently_registered):
try:
lease = next(
lease
for lease in dhcp_leases
if lease.get("active-mac-address") == client["mac-address"]
)
except StopIteration:
continue
registered_leases.append(
Lease(
ts=now,
ip=lease["active-address"],
mac=lease["active-mac-address"],
hostname=lease.get("host-name"),
)
)
registered_leases.sort(key=lambda l: (l.hostname or "").lower())
registered_leases.sort(key=lambda l: not bool(l.hostname))
logging.info(f"Found {len(registered_leases)} registered leases.")
logging.debug(", ".join([str(lease) for lease in registered_leases]))
if len(registered_leases) > 0:
people_cnt = len([lease for lease in registered_leases if lease.is_human])
if people_cnt > 4:
status = Status(
level=2,
description="FILLED",
text="There seems to be a lot of people!",
)
elif people_cnt > 0:
status = Status(
level=1, description="POPULATED", text="There seem to be people!"
)
else:
status = Status(
level=0, description="EMPTY", text="There are only computers."
)
else:
status = Status(
level=0, description="VOID", text="There are no devices connected?"
)
logging.debug("Logging into the database...")
cur = db.cursor()
cur.executemany(
"INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)",
[
(timestamp, lease.mac, lease.hostname, lease.ip)
for lease in registered_leases
],
)
db.commit()
if (
not last_status
or status.level >= last_status.level
or datetime.now() - last_status_change > timedelta(minutes=30)
):
if webhook_url and last_status != status:
requests.post(
webhook_url,
json={
"text": f"Anabasis is now <b>{status.description}</b>! ({status.text})",
"format": "html",
"displayName": "ANABASIS PRESENCE",
},
)
last_status = status
last_status_change = datetime.now()
for output_file in output:
if output_file.endswith(".csv"):
logging.debug(f"Outputting CSV file into {output_file}...")
with open(output_file, "w") as file:
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".lst"):
logging.debug(f"Outputting LST file into {output_file}...")
with open(output_file, "w") as file:
file.write(f"{now}\n")
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".html"):
last_change = None
for ts, leases in groupby(
_fetch_leases(db, now - timedelta(days=7)), key=attrgetter("ts")
):
humans_present = [lease for lease in leases if lease.is_human]
if (len(humans_present) > 0) != (status.level > 0):
last_change = {"ts": ts, "leases": humans_present}
break
log_entry = namedtuple("log_entry", ("ts", "state", "lease"))
log = []
last_seen = []
for ts, leases in groupby(
reversed(_fetch_leases(db, now - timedelta(days=1))),
key=attrgetter("ts"),
):
leases = list(leases)
for lease in leases:
if lease.mac not in (l.mac for l in last_seen):
log.append(log_entry(ts, True, lease))
for lease in last_seen:
if lease.mac not in (l.mac for l in leases):
log.append(log_entry(ts, False, lease))
last_seen = leases
collapse_thresh = timedelta(minutes=10)
while True:
duplicate_index = None
for idx in range(len(log)):
if idx + 1 == len(log):
continue
if (
log[idx].lease.mac == log[idx + 1].lease.mac
and not log[idx].state
and log[idx + 1].state
and log[idx + 1].ts - log[idx].ts < collapse_thresh
):
duplicate_index = idx
if duplicate_index is None:
break
log.pop(duplicate_index) # IN
log.pop(duplicate_index) # OUT
log.reverse()
leaderboard_tmp = {}
mac_to_hostname = {}
for lease in _fetch_leases(db, now - timedelta(days=120)):
if lease.is_human:
leaderboard_tmp.setdefault(lease.mac, 0)
leaderboard_tmp[lease.mac] += 1
mac_to_hostname.setdefault(lease.mac, lease.hostname)
leaderboard_entry = namedtuple("leaderboard_entry", ("name", "total"))
leaderboard = []
for mac, minutes in sorted(
leaderboard_tmp.items(), key=lambda t: t[1], reverse=True
):
leaderboard.append(
leaderboard_entry(
mac_to_hostname.get(mac) or mac,
humanize.naturaldelta(timedelta(minutes=minutes)),
)
)
leaderboard = leaderboard[:10]
logging.debug(f"Outputting HTML file into {output_file}...")
with open(output_file, "w") as file:
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
status=status,
last_change=last_change,
log=log,
leaderboard=leaderboard,
internal=False,
)
file.write(out_str)
if output_internal:
with open(output_internal, "w") as file:
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
status=status,
last_change=last_change,
log=log,
leaderboard=leaderboard,
internal=True,
)
file.write(out_str)
logging.info(f"Sleeping for {period} seconds.")
sleep(period)
if __name__ == "__main__":
run_forever()