import csv import logging import os import re import sqlite3 from dataclasses import dataclass from datetime import datetime, timedelta from time import sleep from typing import List, Optional import click import routeros_api from jinja2 import Environment, select_autoescape, FileSystemLoader from config import Config def _get_db(filepath: str): logging.debug(f"Opening database: {filepath}") connection = sqlite3.connect(filepath) c = connection.cursor() c.execute( '''CREATE TABLE IF NOT EXISTS spottings ( id INTEGER PRIMARY KEY, ts INTEGER, mac TEXT, hostname TEXT, ip TEXT )''' ) connection.commit() return connection def _fetch_leases(db, from_ts: datetime): c = db.cursor() query = c.execute( 'SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC', (int(from_ts.timestamp()),) ) for row in query: yield Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]) @dataclass class Lease: ts: datetime mac: str hostname: Optional[str] ip: str @dataclass class Status: status: str text: str @click.command() @click.option('--address', default=os.getenv("ROUTER_IP", "192.168.42.1"), help='Address of the router.') @click.option('--period', default=60, help='How often to check for clients (in seconds).') @click.option('--ssid', default=os.getenv("SSID", "anabasis"), help='Limit clients to SSID containing this string.') @click.option('-o', '--output', multiple=True, help="Output file.") def run_forever(address: str, period: int, ssid: str, output: str): logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - [%(levelname)s] %(message)s') config = Config db = _get_db("clients.sqlite3") connection = routeros_api.RouterOsApiPool( address, username='admin', password=os.environ["ROUTER_PASSWORD"], plaintext_login=True ) jinja_env = Environment( loader=FileSystemLoader('templates'), autoescape=select_autoescape(['html', 'xml']) ) while True: logging.info(f"Querying router at {address}...") api = connection.get_api() currently_registered = api.get_resource('/caps-man/registration-table').call('print') logging.debug(f"Got {len(currently_registered)} registered clients.") dhcp_leases = api.get_resource('/ip/dhcp-server/lease').call('print') logging.debug(f"Got {len(dhcp_leases)} DHCP leases.") now = datetime.now() timestamp = int(now.timestamp()) registered_leases: List[Lease] = [] for client in filter(lambda c: ssid in c['ssid'], currently_registered): try: lease = next( lease for lease in dhcp_leases if lease.get('active-mac-address') == client['mac-address'] ) except StopIteration: continue registered_leases.append( Lease(ts=now, ip=lease['active-address'], mac=lease['active-mac-address'], hostname=lease.get('host-name')) ) logging.info(f"Found {len(registered_leases)} registered leases.") logging.debug(", ".join([str(lease) for lease in registered_leases])) if len(registered_leases) > 0: if len([lease for lease in registered_leases if not any(re.match(ch, lease.hostname or "") for ch in config.computer_hostnames)]) > 0: status = Status(status="populated", text="There seem to be people!") else: status = Status(status="empty", text="There are only computers.") else: status = Status(status="empty", text="There are no devices connected?") logging.debug("Logging into the database...") cur = db.cursor() cur.executemany("INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)", [ (timestamp, lease.mac, lease.hostname, lease.ip) for lease in registered_leases ]) db.commit() for output_file in output: if output_file.endswith(".csv"): logging.debug(f"Outputting CSV file into {output_file}...") with open(output_file, 'w') as file: writer = csv.writer(file) for lease in registered_leases: writer.writerow((lease.ip, lease.mac, lease.hostname or "???")) elif output_file.endswith(".lst"): logging.debug(f"Outputting LST file into {output_file}...") with open(output_file, 'w') as file: file.write(f"{now}\n") writer = csv.writer(file) for lease in registered_leases: writer.writerow((lease.ip, lease.mac, lease.hostname or "???")) elif output_file.endswith(".html"): last_human = next((lease for lease in _fetch_leases(db, now - timedelta(hours=24)) if not any(re.match(ch, lease.hostname or "") for ch in config.computer_hostnames)), None) logging.debug(f"Outputting HTML file into {output_file}...") with open(output_file, 'w') as file: out_str = jinja_env.get_template("index.html").render( now=now, leases=registered_leases, status=status, last_human=last_human ) file.write(out_str) logging.info(f"Sleeping for {period} seconds.") sleep(period) if __name__ == '__main__': run_forever()