328 lines
11 KiB
Python
328 lines
11 KiB
Python
import csv
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
from collections import namedtuple
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from itertools import groupby
|
|
from operator import attrgetter
|
|
from time import sleep
|
|
from typing import List, Optional
|
|
|
|
import click
|
|
import humanize
|
|
import requests
|
|
import routeros_api
|
|
from jinja2 import Environment, select_autoescape, FileSystemLoader
|
|
|
|
from config import Config
|
|
|
|
config = Config
|
|
|
|
|
|
@dataclass
|
|
class Lease:
|
|
ts: datetime
|
|
mac: str
|
|
hostname: Optional[str]
|
|
ip: str
|
|
|
|
@property
|
|
def display(self):
|
|
return self.hostname or self.mac
|
|
|
|
@property
|
|
def is_human(self):
|
|
if self.hostname:
|
|
return not any(
|
|
re.match(ch, self.hostname) for ch in config.computer_hostnames
|
|
)
|
|
else:
|
|
return True
|
|
|
|
|
|
@dataclass
|
|
class Status:
|
|
level: int
|
|
description: str
|
|
text: str
|
|
|
|
|
|
def _get_db(filepath: str):
|
|
logging.debug(f"Opening database: {filepath}")
|
|
connection = sqlite3.connect(filepath)
|
|
|
|
c = connection.cursor()
|
|
c.execute(
|
|
"""CREATE TABLE IF NOT EXISTS spottings (
|
|
id INTEGER PRIMARY KEY,
|
|
ts INTEGER,
|
|
mac TEXT,
|
|
hostname TEXT,
|
|
ip TEXT
|
|
)"""
|
|
)
|
|
connection.commit()
|
|
|
|
return connection
|
|
|
|
|
|
def _fetch_leases(db, from_ts: datetime):
|
|
output = []
|
|
c = db.cursor()
|
|
query = c.execute(
|
|
"SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC",
|
|
(int(from_ts.timestamp()),),
|
|
)
|
|
for row in query:
|
|
output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]))
|
|
return output
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--address",
|
|
default=os.getenv("ROUTER_IP", "192.168.42.1"),
|
|
help="Address of the router.",
|
|
)
|
|
@click.option(
|
|
"--period", default=60, help="How often to check for clients (in seconds)."
|
|
)
|
|
@click.option(
|
|
"--ssid",
|
|
default=os.getenv("SSID", "anabasis"),
|
|
help="Limit clients to SSID containing this string.",
|
|
)
|
|
@click.option("--webhook-url", help="Webhook URL to post status changes to.")
|
|
@click.option("-o", "--output", multiple=True, help="Output file.")
|
|
@click.option("--output-internal", help="Output file (with internal stats).")
|
|
def run_forever(
|
|
address: str,
|
|
period: int,
|
|
ssid: str,
|
|
output: str,
|
|
output_internal: Optional[str],
|
|
webhook_url: Optional[str],
|
|
):
|
|
logging.basicConfig(
|
|
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
|
|
)
|
|
db = _get_db("clients.sqlite3")
|
|
|
|
connection = routeros_api.RouterOsApiPool(
|
|
address,
|
|
username="admin",
|
|
password=os.environ["ROUTER_PASSWORD"],
|
|
plaintext_login=True,
|
|
)
|
|
|
|
jinja_env = Environment(
|
|
loader=FileSystemLoader("templates"),
|
|
autoescape=select_autoescape(["html", "xml"]),
|
|
)
|
|
|
|
last_status, last_status_change = None, datetime.now()
|
|
|
|
while True:
|
|
logging.info(f"Querying router at {address}...")
|
|
api = connection.get_api()
|
|
|
|
currently_registered = api.get_resource("/caps-man/registration-table").call(
|
|
"print"
|
|
)
|
|
logging.debug(f"Got {len(currently_registered)} registered clients.")
|
|
dhcp_leases = api.get_resource("/ip/dhcp-server/lease").call("print")
|
|
logging.debug(f"Got {len(dhcp_leases)} DHCP leases.")
|
|
|
|
now = datetime.now()
|
|
timestamp = int(now.timestamp())
|
|
|
|
registered_leases: List[Lease] = []
|
|
for client in filter(lambda c: ssid in c["ssid"], currently_registered):
|
|
try:
|
|
lease = next(
|
|
lease
|
|
for lease in dhcp_leases
|
|
if lease.get("active-mac-address") == client["mac-address"]
|
|
)
|
|
except StopIteration:
|
|
continue
|
|
registered_leases.append(
|
|
Lease(
|
|
ts=now,
|
|
ip=lease["active-address"],
|
|
mac=lease["active-mac-address"],
|
|
hostname=lease.get("host-name"),
|
|
)
|
|
)
|
|
registered_leases.sort(key=lambda l: (l.hostname or "").lower())
|
|
registered_leases.sort(key=lambda l: not bool(l.hostname))
|
|
logging.info(f"Found {len(registered_leases)} registered leases.")
|
|
logging.debug(", ".join([str(lease) for lease in registered_leases]))
|
|
|
|
if len(registered_leases) > 0:
|
|
people_cnt = len([lease for lease in registered_leases if lease.is_human])
|
|
if people_cnt > 4:
|
|
status = Status(
|
|
level=2,
|
|
description="FILLED",
|
|
text="There seems to be a lot of people!",
|
|
)
|
|
elif people_cnt > 0:
|
|
status = Status(
|
|
level=1, description="POPULATED", text="There seem to be people!"
|
|
)
|
|
else:
|
|
status = Status(
|
|
level=0, description="EMPTY", text="There are only computers."
|
|
)
|
|
else:
|
|
status = Status(
|
|
level=0, description="VOID", text="There are no devices connected?"
|
|
)
|
|
|
|
logging.debug("Logging into the database...")
|
|
cur = db.cursor()
|
|
cur.executemany(
|
|
"INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)",
|
|
[
|
|
(timestamp, lease.mac, lease.hostname, lease.ip)
|
|
for lease in registered_leases
|
|
],
|
|
)
|
|
db.commit()
|
|
|
|
if (
|
|
not last_status
|
|
or status.level >= last_status.level
|
|
or datetime.now() - last_status_change > timedelta(minutes=30)
|
|
):
|
|
|
|
if webhook_url and last_status != status:
|
|
requests.post(
|
|
webhook_url,
|
|
json={
|
|
"text": f"Anabasis is now <b>{status.description}</b>! ({status.text})",
|
|
"format": "html",
|
|
"displayName": "ANABASIS PRESENCE",
|
|
},
|
|
)
|
|
|
|
last_status = status
|
|
last_status_change = datetime.now()
|
|
|
|
for output_file in output:
|
|
if output_file.endswith(".csv"):
|
|
logging.debug(f"Outputting CSV file into {output_file}...")
|
|
with open(output_file, "w") as file:
|
|
writer = csv.writer(file)
|
|
for lease in registered_leases:
|
|
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
|
|
elif output_file.endswith(".lst"):
|
|
logging.debug(f"Outputting LST file into {output_file}...")
|
|
with open(output_file, "w") as file:
|
|
file.write(f"{now}\n")
|
|
writer = csv.writer(file)
|
|
for lease in registered_leases:
|
|
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
|
|
elif output_file.endswith(".html"):
|
|
last_change = None
|
|
for ts, leases in groupby(
|
|
_fetch_leases(db, now - timedelta(days=7)), key=attrgetter("ts")
|
|
):
|
|
humans_present = [lease for lease in leases if lease.is_human]
|
|
if (len(humans_present) > 0) != (status.level > 0):
|
|
last_change = {"ts": ts, "leases": humans_present}
|
|
break
|
|
|
|
log_entry = namedtuple("log_entry", ("ts", "state", "lease"))
|
|
log = []
|
|
last_seen = []
|
|
for ts, leases in groupby(
|
|
reversed(_fetch_leases(db, now - timedelta(days=1))),
|
|
key=attrgetter("ts"),
|
|
):
|
|
leases = list(leases)
|
|
for lease in leases:
|
|
if lease.mac not in (l.mac for l in last_seen):
|
|
log.append(log_entry(ts, True, lease))
|
|
for lease in last_seen:
|
|
if lease.mac not in (l.mac for l in leases):
|
|
log.append(log_entry(ts, False, lease))
|
|
last_seen = leases
|
|
|
|
collapse_thresh = timedelta(minutes=10)
|
|
while True:
|
|
duplicate_index = None
|
|
for idx in range(len(log)):
|
|
if idx + 1 == len(log):
|
|
continue
|
|
if (
|
|
log[idx].lease.mac == log[idx + 1].lease.mac
|
|
and not log[idx].state
|
|
and log[idx + 1].state
|
|
and log[idx + 1].ts - log[idx].ts < collapse_thresh
|
|
):
|
|
duplicate_index = idx
|
|
if duplicate_index is None:
|
|
break
|
|
log.pop(duplicate_index) # IN
|
|
log.pop(duplicate_index) # OUT
|
|
|
|
log.reverse()
|
|
|
|
leaderboard_tmp = {}
|
|
mac_to_hostname = {}
|
|
for lease in _fetch_leases(db, now - timedelta(days=120)):
|
|
if lease.is_human:
|
|
leaderboard_tmp.setdefault(lease.mac, 0)
|
|
leaderboard_tmp[lease.mac] += 1
|
|
mac_to_hostname.setdefault(lease.mac, lease.hostname)
|
|
|
|
leaderboard_entry = namedtuple("leaderboard_entry", ("name", "total"))
|
|
leaderboard = []
|
|
for mac, minutes in sorted(
|
|
leaderboard_tmp.items(), key=lambda t: t[1], reverse=True
|
|
):
|
|
leaderboard.append(
|
|
leaderboard_entry(
|
|
mac_to_hostname.get(mac) or mac,
|
|
humanize.naturaldelta(timedelta(minutes=minutes)),
|
|
)
|
|
)
|
|
leaderboard = leaderboard[:10]
|
|
|
|
logging.debug(f"Outputting HTML file into {output_file}...")
|
|
with open(output_file, "w") as file:
|
|
out_str = jinja_env.get_template("index.html").render(
|
|
now=now,
|
|
leases=registered_leases,
|
|
status=status,
|
|
last_change=last_change,
|
|
log=log,
|
|
leaderboard=leaderboard,
|
|
internal=False,
|
|
)
|
|
file.write(out_str)
|
|
|
|
if output_internal:
|
|
with open(output_internal, "w") as file:
|
|
out_str = jinja_env.get_template("index.html").render(
|
|
now=now,
|
|
leases=registered_leases,
|
|
status=status,
|
|
last_change=last_change,
|
|
log=log,
|
|
leaderboard=leaderboard,
|
|
internal=True,
|
|
)
|
|
file.write(out_str)
|
|
|
|
logging.info(f"Sleeping for {period} seconds.")
|
|
sleep(period)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_forever()
|