import csv import logging import os import re import sqlite3 from collections import namedtuple from dataclasses import dataclass from datetime import datetime, timedelta from itertools import groupby from operator import attrgetter from time import sleep from typing import List, Optional import click import humanize import requests import routeros_api from jinja2 import Environment, select_autoescape, FileSystemLoader from config import Config config = Config @dataclass class Lease: ts: datetime mac: str hostname: Optional[str] ip: str @property def display(self): return self.hostname or self.mac @property def is_human(self): if self.hostname: return not any( re.match(ch, self.hostname) for ch in config.computer_hostnames ) else: return True @dataclass class Status: level: int description: str text: str def _get_db(filepath: str): logging.debug(f"Opening database: {filepath}") connection = sqlite3.connect(filepath) c = connection.cursor() c.execute( """CREATE TABLE IF NOT EXISTS spottings ( id INTEGER PRIMARY KEY, ts INTEGER, mac TEXT, hostname TEXT, ip TEXT )""" ) connection.commit() return connection def _fetch_leases(db, from_ts: datetime): output = [] c = db.cursor() query = c.execute( "SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC", (int(from_ts.timestamp()),), ) for row in query: output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3])) return output @click.command() @click.option( "--address", default=os.getenv("ROUTER_IP", "192.168.42.1"), help="Address of the router.", ) @click.option( "--period", default=60, help="How often to check for clients (in seconds)." ) @click.option( "--ssid", default=os.getenv("SSID", "anabasis"), help="Limit clients to SSID containing this string.", ) @click.option("--webhook-url", help="Webhook URL to post status changes to.") @click.option("-o", "--output", multiple=True, help="Output file.") @click.option("--output-internal", help="Output file (with internal stats).") def run_forever( address: str, period: int, ssid: str, output: str, output_internal: Optional[str], webhook_url: Optional[str], ): logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s" ) db = _get_db("clients.sqlite3") connection = routeros_api.RouterOsApiPool( address, username="admin", password=os.environ["ROUTER_PASSWORD"], plaintext_login=True, ) jinja_env = Environment( loader=FileSystemLoader("templates"), autoescape=select_autoescape(["html", "xml"]), ) last_status, last_status_change = None, datetime.now() while True: logging.info(f"Querying router at {address}...") api = connection.get_api() currently_registered = api.get_resource("/caps-man/registration-table").call( "print" ) logging.debug(f"Got {len(currently_registered)} registered clients.") dhcp_leases = api.get_resource("/ip/dhcp-server/lease").call("print") logging.debug(f"Got {len(dhcp_leases)} DHCP leases.") now = datetime.now() timestamp = int(now.timestamp()) registered_leases: List[Lease] = [] for client in filter(lambda c: ssid in c["ssid"], currently_registered): try: lease = next( lease for lease in dhcp_leases if lease.get("active-mac-address") == client["mac-address"] ) except StopIteration: continue registered_leases.append( Lease( ts=now, ip=lease["active-address"], mac=lease["active-mac-address"], hostname=lease.get("host-name"), ) ) registered_leases.sort(key=lambda l: (l.hostname or "").lower()) registered_leases.sort(key=lambda l: not bool(l.hostname)) logging.info(f"Found {len(registered_leases)} registered leases.") logging.debug(", ".join([str(lease) for lease in registered_leases])) if len(registered_leases) > 0: people_cnt = len([lease for lease in registered_leases if lease.is_human]) if people_cnt > 4: status = Status( level=2, description="FILLED", text="There seems to be a lot of people!", ) elif people_cnt > 0: status = Status( level=1, description="POPULATED", text="There seem to be people!" ) else: status = Status( level=0, description="EMPTY", text="There are only computers." ) else: status = Status( level=0, description="VOID", text="There are no devices connected?" ) logging.debug("Logging into the database...") cur = db.cursor() cur.executemany( "INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)", [ (timestamp, lease.mac, lease.hostname, lease.ip) for lease in registered_leases ], ) db.commit() if ( not last_status or status.level >= last_status.level or datetime.now() - last_status_change > timedelta(minutes=30) ): if webhook_url and last_status != status: requests.post( webhook_url, json={ "text": f"Anabasis is now {status.description}! ({status.text})", "format": "html", "displayName": "ANABASIS PRESENCE", }, ) last_status = status last_status_change = datetime.now() for output_file in output: if output_file.endswith(".csv"): logging.debug(f"Outputting CSV file into {output_file}...") with open(output_file, "w") as file: writer = csv.writer(file) for lease in registered_leases: writer.writerow((lease.ip, lease.mac, lease.hostname or "???")) elif output_file.endswith(".lst"): logging.debug(f"Outputting LST file into {output_file}...") with open(output_file, "w") as file: file.write(f"{now}\n") writer = csv.writer(file) for lease in registered_leases: writer.writerow((lease.ip, lease.mac, lease.hostname or "???")) elif output_file.endswith(".html"): last_change = None for ts, leases in groupby( _fetch_leases(db, now - timedelta(days=7)), key=attrgetter("ts") ): humans_present = [lease for lease in leases if lease.is_human] if (len(humans_present) > 0) != (status.level > 0): last_change = {"ts": ts, "leases": humans_present} break log_entry = namedtuple("log_entry", ("ts", "state", "lease")) log = [] last_seen = [] for ts, leases in groupby( reversed(_fetch_leases(db, now - timedelta(days=1))), key=attrgetter("ts"), ): leases = list(leases) for lease in leases: if lease.mac not in (l.mac for l in last_seen): log.append(log_entry(ts, True, lease)) for lease in last_seen: if lease.mac not in (l.mac for l in leases): log.append(log_entry(ts, False, lease)) last_seen = leases collapse_thresh = timedelta(minutes=10) while True: duplicate_index = None for idx in range(len(log)): if idx + 1 == len(log): continue if ( log[idx].lease.mac == log[idx + 1].lease.mac and not log[idx].state and log[idx + 1].state and log[idx + 1].ts - log[idx].ts < collapse_thresh ): duplicate_index = idx if duplicate_index is None: break log.pop(duplicate_index) # IN log.pop(duplicate_index) # OUT log.reverse() leaderboard_tmp = {} mac_to_hostname = {} for lease in _fetch_leases(db, now - timedelta(days=120)): if lease.is_human: leaderboard_tmp.setdefault(lease.mac, 0) leaderboard_tmp[lease.mac] += 1 mac_to_hostname.setdefault(lease.mac, lease.hostname) leaderboard_entry = namedtuple("leaderboard_entry", ("name", "total")) leaderboard = [] for mac, minutes in sorted( leaderboard_tmp.items(), key=lambda t: t[1], reverse=True ): leaderboard.append( leaderboard_entry( mac_to_hostname.get(mac) or mac, humanize.naturaldelta(timedelta(minutes=minutes)), ) ) leaderboard = leaderboard[:10] logging.debug(f"Outputting HTML file into {output_file}...") with open(output_file, "w") as file: out_str = jinja_env.get_template("index.html").render( now=now, leases=registered_leases, status=status, last_change=last_change, log=log, leaderboard=leaderboard, internal=False, ) file.write(out_str) if output_internal: with open(output_internal, "w") as file: out_str = jinja_env.get_template("index.html").render( now=now, leases=registered_leases, status=status, last_change=last_change, log=log, leaderboard=leaderboard, internal=True, ) file.write(out_str) logging.info(f"Sleeping for {period} seconds.") sleep(period) if __name__ == "__main__": run_forever()