anabasis-clients/generate.py

329 lines
11 KiB
Python
Raw Permalink Normal View History

2021-01-14 19:49:54 +01:00
import csv
2021-02-09 20:44:19 +01:00
import logging
import os
2021-02-10 23:07:37 +01:00
import re
2021-02-09 20:44:19 +01:00
import sqlite3
2021-02-11 01:55:15 +01:00
from collections import namedtuple
2021-02-09 20:44:19 +01:00
from dataclasses import dataclass
2021-02-10 23:21:05 +01:00
from datetime import datetime, timedelta
2021-02-11 01:09:40 +01:00
from itertools import groupby
from operator import attrgetter
2021-02-09 20:44:19 +01:00
from time import sleep
from typing import List, Optional
import click
2021-02-11 02:14:06 +01:00
import humanize
2021-02-22 23:39:46 +01:00
import requests
2021-02-09 20:44:19 +01:00
import routeros_api
2021-02-10 23:07:37 +01:00
from jinja2 import Environment, select_autoescape, FileSystemLoader
from config import Config
2021-02-09 20:44:19 +01:00
2021-02-11 01:09:40 +01:00
config = Config
@dataclass
class Lease:
ts: datetime
mac: str
hostname: Optional[str]
ip: str
@property
def display(self):
return self.hostname or self.mac
2022-01-17 15:10:35 +01:00
@property
def is_human(self):
if self.hostname:
return not any(
re.match(ch, self.hostname) for ch in config.computer_hostnames
)
else:
return True
2021-02-11 01:09:40 +01:00
@dataclass
class Status:
level: int
2021-02-11 02:08:59 +01:00
description: str
2021-02-11 01:09:40 +01:00
text: str
2021-02-09 20:44:19 +01:00
def _get_db(filepath: str):
logging.debug(f"Opening database: {filepath}")
connection = sqlite3.connect(filepath)
c = connection.cursor()
c.execute(
2022-01-03 20:11:51 +01:00
"""CREATE TABLE IF NOT EXISTS spottings (
2021-02-09 20:44:19 +01:00
id INTEGER PRIMARY KEY,
ts INTEGER,
mac TEXT,
hostname TEXT,
ip TEXT
2022-01-03 20:11:51 +01:00
)"""
2021-02-09 20:44:19 +01:00
)
connection.commit()
return connection
2021-02-10 23:21:05 +01:00
def _fetch_leases(db, from_ts: datetime):
2021-02-11 01:55:15 +01:00
output = []
2021-02-10 23:21:05 +01:00
c = db.cursor()
query = c.execute(
2022-01-03 20:11:51 +01:00
"SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC",
(int(from_ts.timestamp()),),
2021-02-10 23:21:05 +01:00
)
for row in query:
2021-02-11 01:55:15 +01:00
output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]))
return output
2021-02-10 23:21:05 +01:00
2021-02-09 20:44:19 +01:00
@click.command()
2022-01-03 20:11:51 +01:00
@click.option(
"--address",
default=os.getenv("ROUTER_IP", "192.168.42.1"),
help="Address of the router.",
)
@click.option(
"--period", default=60, help="How often to check for clients (in seconds)."
)
@click.option(
"--ssid",
default=os.getenv("SSID", "anabasis"),
help="Limit clients to SSID containing this string.",
)
@click.option("--webhook-url", help="Webhook URL to post status changes to.")
@click.option("-o", "--output", multiple=True, help="Output file.")
@click.option("--output-internal", help="Output file (with internal stats).")
2022-01-03 20:11:51 +01:00
def run_forever(
address: str,
period: int,
ssid: str,
output: str,
output_internal: Optional[str],
webhook_url: Optional[str],
2022-01-03 20:11:51 +01:00
):
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
2021-02-09 20:44:19 +01:00
db = _get_db("clients.sqlite3")
connection = routeros_api.RouterOsApiPool(
address,
2022-01-03 20:11:51 +01:00
username="admin",
2021-02-09 20:44:19 +01:00
password=os.environ["ROUTER_PASSWORD"],
2022-01-03 20:11:51 +01:00
plaintext_login=True,
2021-02-09 20:44:19 +01:00
)
2021-02-10 23:07:37 +01:00
jinja_env = Environment(
2022-01-03 20:11:51 +01:00
loader=FileSystemLoader("templates"),
autoescape=select_autoescape(["html", "xml"]),
2021-02-10 23:07:37 +01:00
)
last_status, last_status_change = None, datetime.now()
2021-02-22 23:39:46 +01:00
2021-02-09 20:44:19 +01:00
while True:
logging.info(f"Querying router at {address}...")
api = connection.get_api()
2022-01-03 20:11:51 +01:00
currently_registered = api.get_resource("/caps-man/registration-table").call(
"print"
)
2021-02-09 20:44:19 +01:00
logging.debug(f"Got {len(currently_registered)} registered clients.")
2022-01-03 20:11:51 +01:00
dhcp_leases = api.get_resource("/ip/dhcp-server/lease").call("print")
2021-02-09 20:44:19 +01:00
logging.debug(f"Got {len(dhcp_leases)} DHCP leases.")
2021-02-10 23:21:05 +01:00
now = datetime.now()
timestamp = int(now.timestamp())
2021-02-09 20:44:19 +01:00
registered_leases: List[Lease] = []
2022-01-03 20:11:51 +01:00
for client in filter(lambda c: ssid in c["ssid"], currently_registered):
2021-02-09 20:44:19 +01:00
try:
lease = next(
2022-01-03 20:11:51 +01:00
lease
for lease in dhcp_leases
if lease.get("active-mac-address") == client["mac-address"]
2021-02-09 20:44:19 +01:00
)
except StopIteration:
continue
registered_leases.append(
2022-01-03 20:11:51 +01:00
Lease(
ts=now,
ip=lease["active-address"],
mac=lease["active-mac-address"],
hostname=lease.get("host-name"),
)
2021-02-09 20:44:19 +01:00
)
2021-02-11 01:09:40 +01:00
registered_leases.sort(key=lambda l: (l.hostname or "").lower())
registered_leases.sort(key=lambda l: not bool(l.hostname))
2021-02-09 20:44:19 +01:00
logging.info(f"Found {len(registered_leases)} registered leases.")
logging.debug(", ".join([str(lease) for lease in registered_leases]))
2021-02-10 23:07:37 +01:00
if len(registered_leases) > 0:
2022-01-17 15:10:35 +01:00
people_cnt = len([lease for lease in registered_leases if lease.is_human])
2021-02-24 22:07:01 +01:00
if people_cnt > 4:
2022-01-03 20:11:51 +01:00
status = Status(
level=2,
description="FILLED",
text="There seems to be a lot of people!",
)
2021-02-11 02:08:59 +01:00
elif people_cnt > 0:
2022-01-03 20:11:51 +01:00
status = Status(
level=1, description="POPULATED", text="There seem to be people!"
)
2021-02-10 23:07:37 +01:00
else:
2022-01-03 20:11:51 +01:00
status = Status(
level=0, description="EMPTY", text="There are only computers."
)
2021-02-10 23:07:37 +01:00
else:
2022-01-03 20:11:51 +01:00
status = Status(
level=0, description="VOID", text="There are no devices connected?"
)
2021-02-10 23:07:37 +01:00
2021-02-09 20:44:19 +01:00
logging.debug("Logging into the database...")
cur = db.cursor()
2022-01-03 20:11:51 +01:00
cur.executemany(
"INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)",
[
(timestamp, lease.mac, lease.hostname, lease.ip)
for lease in registered_leases
],
)
2021-02-09 20:44:19 +01:00
db.commit()
2022-01-03 20:11:51 +01:00
if (
not last_status
or status.level >= last_status.level
or datetime.now() - last_status_change > timedelta(minutes=30)
):
if webhook_url and last_status != status:
2022-01-03 20:11:51 +01:00
requests.post(
webhook_url,
json={
"text": f"Anabasis is now <b>{status.description}</b>! ({status.text})",
"format": "html",
"displayName": "ANABASIS PRESENCE",
},
)
last_status = status
last_status_change = datetime.now()
2021-02-22 23:39:46 +01:00
2021-02-10 23:07:37 +01:00
for output_file in output:
if output_file.endswith(".csv"):
logging.debug(f"Outputting CSV file into {output_file}...")
2022-01-03 20:11:51 +01:00
with open(output_file, "w") as file:
2021-02-10 23:07:37 +01:00
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".lst"):
logging.debug(f"Outputting LST file into {output_file}...")
2022-01-03 20:11:51 +01:00
with open(output_file, "w") as file:
2021-02-10 23:07:37 +01:00
file.write(f"{now}\n")
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".html"):
2021-02-11 01:09:40 +01:00
last_change = None
2022-01-03 20:11:51 +01:00
for ts, leases in groupby(
_fetch_leases(db, now - timedelta(days=7)), key=attrgetter("ts")
):
2022-01-17 15:10:35 +01:00
humans_present = [lease for lease in leases if lease.is_human]
2021-04-19 23:45:54 +02:00
if (len(humans_present) > 0) != (status.level > 0):
2022-01-03 20:11:51 +01:00
last_change = {"ts": ts, "leases": humans_present}
2021-02-11 01:09:40 +01:00
break
2021-02-10 23:21:05 +01:00
2022-01-03 20:11:51 +01:00
log_entry = namedtuple("log_entry", ("ts", "state", "lease"))
2021-02-11 01:55:15 +01:00
log = []
last_seen = []
2022-01-03 20:11:51 +01:00
for ts, leases in groupby(
reversed(_fetch_leases(db, now - timedelta(days=1))),
key=attrgetter("ts"),
):
2021-02-11 01:55:15 +01:00
leases = list(leases)
for lease in leases:
if lease.mac not in (l.mac for l in last_seen):
log.append(log_entry(ts, True, lease))
for lease in last_seen:
if lease.mac not in (l.mac for l in leases):
log.append(log_entry(ts, False, lease))
last_seen = leases
collapse_thresh = timedelta(minutes=10)
while True:
duplicate_index = None
for idx in range(len(log)):
if idx + 1 == len(log):
continue
2022-01-03 20:11:51 +01:00
if (
log[idx].lease.mac == log[idx + 1].lease.mac
and not log[idx].state
and log[idx + 1].state
and log[idx + 1].ts - log[idx].ts < collapse_thresh
):
duplicate_index = idx
if duplicate_index is None:
break
log.pop(duplicate_index) # IN
log.pop(duplicate_index) # OUT
2021-02-11 01:55:15 +01:00
log.reverse()
2021-02-11 02:06:30 +01:00
leaderboard_tmp = {}
mac_to_hostname = {}
for lease in _fetch_leases(db, now - timedelta(days=120)):
2022-01-17 15:10:35 +01:00
if lease.is_human:
2021-02-11 02:06:30 +01:00
leaderboard_tmp.setdefault(lease.mac, 0)
leaderboard_tmp[lease.mac] += 1
mac_to_hostname.setdefault(lease.mac, lease.hostname)
2022-01-03 20:11:51 +01:00
leaderboard_entry = namedtuple("leaderboard_entry", ("name", "total"))
2021-02-11 02:06:30 +01:00
leaderboard = []
2022-01-03 20:11:51 +01:00
for mac, minutes in sorted(
leaderboard_tmp.items(), key=lambda t: t[1], reverse=True
):
2021-02-11 02:14:06 +01:00
leaderboard.append(
leaderboard_entry(
mac_to_hostname.get(mac) or mac,
2022-01-03 20:11:51 +01:00
humanize.naturaldelta(timedelta(minutes=minutes)),
2021-02-11 02:14:06 +01:00
)
)
2021-02-11 02:06:30 +01:00
leaderboard = leaderboard[:10]
2021-02-10 23:07:37 +01:00
logging.debug(f"Outputting HTML file into {output_file}...")
2022-01-03 20:11:51 +01:00
with open(output_file, "w") as file:
2021-02-10 23:07:37 +01:00
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
2021-02-10 23:21:05 +01:00
status=status,
2021-02-11 01:55:15 +01:00
last_change=last_change,
2021-02-11 02:06:30 +01:00
log=log,
2022-01-03 20:11:51 +01:00
leaderboard=leaderboard,
internal=False,
2021-02-10 23:07:37 +01:00
)
file.write(out_str)
2021-02-09 20:44:19 +01:00
if output_internal:
with open(output_internal, "w") as file:
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
status=status,
last_change=last_change,
log=log,
leaderboard=leaderboard,
internal=True,
)
file.write(out_str)
2021-02-09 20:44:19 +01:00
logging.info(f"Sleeping for {period} seconds.")
sleep(period)
2021-01-14 19:49:54 +01:00
2022-01-03 20:11:51 +01:00
if __name__ == "__main__":
2021-02-09 20:44:19 +01:00
run_forever()