diff --git a/generate.py b/generate.py
index b3fa8d6..af3a0a5 100644
--- a/generate.py
+++ b/generate.py
@@ -47,13 +47,13 @@ def _get_db(filepath: str):
c = connection.cursor()
c.execute(
- '''CREATE TABLE IF NOT EXISTS spottings (
+ """CREATE TABLE IF NOT EXISTS spottings (
id INTEGER PRIMARY KEY,
ts INTEGER,
mac TEXT,
hostname TEXT,
ip TEXT
- )'''
+ )"""
)
connection.commit()
@@ -64,8 +64,8 @@ def _fetch_leases(db, from_ts: datetime):
output = []
c = db.cursor()
query = c.execute(
- 'SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC',
- (int(from_ts.timestamp()),)
+ "SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC",
+ (int(from_ts.timestamp()),),
)
for row in query:
output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]))
@@ -80,25 +80,39 @@ def _is_human(lease: Lease):
@click.command()
-@click.option('--address', default=os.getenv("ROUTER_IP", "192.168.42.1"), help='Address of the router.')
-@click.option('--period', default=60, help='How often to check for clients (in seconds).')
-@click.option('--ssid', default=os.getenv("SSID", "anabasis"), help='Limit clients to SSID containing this string.')
-@click.option('--webhook-url', help='Webhook URL to post status changes to.')
-@click.option('-o', '--output', multiple=True, help="Output file.")
-def run_forever(address: str, period: int, ssid: str, output: str, webhook_url: Optional[str]):
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - [%(levelname)s] %(message)s')
+@click.option(
+ "--address",
+ default=os.getenv("ROUTER_IP", "192.168.42.1"),
+ help="Address of the router.",
+)
+@click.option(
+ "--period", default=60, help="How often to check for clients (in seconds)."
+)
+@click.option(
+ "--ssid",
+ default=os.getenv("SSID", "anabasis"),
+ help="Limit clients to SSID containing this string.",
+)
+@click.option("--webhook-url", help="Webhook URL to post status changes to.")
+@click.option("-o", "--output", multiple=True, help="Output file.")
+def run_forever(
+ address: str, period: int, ssid: str, output: str, webhook_url: Optional[str]
+):
+ logging.basicConfig(
+ level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
+ )
db = _get_db("clients.sqlite3")
connection = routeros_api.RouterOsApiPool(
address,
- username='admin',
+ username="admin",
password=os.environ["ROUTER_PASSWORD"],
- plaintext_login=True
+ plaintext_login=True,
)
jinja_env = Environment(
- loader=FileSystemLoader('templates'),
- autoescape=select_autoescape(['html', 'xml'])
+ loader=FileSystemLoader("templates"),
+ autoescape=select_autoescape(["html", "xml"]),
)
last_status, last_status_change = None, datetime.now()
@@ -107,25 +121,33 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
logging.info(f"Querying router at {address}...")
api = connection.get_api()
- currently_registered = api.get_resource('/caps-man/registration-table').call('print')
+ currently_registered = api.get_resource("/caps-man/registration-table").call(
+ "print"
+ )
logging.debug(f"Got {len(currently_registered)} registered clients.")
- dhcp_leases = api.get_resource('/ip/dhcp-server/lease').call('print')
+ dhcp_leases = api.get_resource("/ip/dhcp-server/lease").call("print")
logging.debug(f"Got {len(dhcp_leases)} DHCP leases.")
now = datetime.now()
timestamp = int(now.timestamp())
registered_leases: List[Lease] = []
- for client in filter(lambda c: ssid in c['ssid'], currently_registered):
+ for client in filter(lambda c: ssid in c["ssid"], currently_registered):
try:
lease = next(
- lease for lease in dhcp_leases if lease.get('active-mac-address') == client['mac-address']
+ lease
+ for lease in dhcp_leases
+ if lease.get("active-mac-address") == client["mac-address"]
)
except StopIteration:
continue
registered_leases.append(
- Lease(ts=now, ip=lease['active-address'], mac=lease['active-mac-address'],
- hostname=lease.get('host-name'))
+ Lease(
+ ts=now,
+ ip=lease["active-address"],
+ mac=lease["active-mac-address"],
+ hostname=lease.get("host-name"),
+ )
)
registered_leases.sort(key=lambda l: (l.hostname or "").lower())
registered_leases.sort(key=lambda l: not bool(l.hostname))
@@ -135,31 +157,50 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
if len(registered_leases) > 0:
people_cnt = len([lease for lease in registered_leases if _is_human(lease)])
if people_cnt > 4:
- status = Status(level=2, description='FILLED', text="There seems to be a lot of people!")
+ status = Status(
+ level=2,
+ description="FILLED",
+ text="There seems to be a lot of people!",
+ )
elif people_cnt > 0:
- status = Status(level=1, description='POPULATED', text="There seem to be people!")
+ status = Status(
+ level=1, description="POPULATED", text="There seem to be people!"
+ )
else:
- status = Status(level=0, description='EMPTY', text="There are only computers.")
+ status = Status(
+ level=0, description="EMPTY", text="There are only computers."
+ )
else:
- status = Status(level=0, description='VOID', text="There are no devices connected?")
+ status = Status(
+ level=0, description="VOID", text="There are no devices connected?"
+ )
logging.debug("Logging into the database...")
cur = db.cursor()
- cur.executemany("INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)", [
- (timestamp, lease.mac, lease.hostname, lease.ip) for lease in registered_leases
- ])
+ cur.executemany(
+ "INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)",
+ [
+ (timestamp, lease.mac, lease.hostname, lease.ip)
+ for lease in registered_leases
+ ],
+ )
db.commit()
- if not last_status or \
- status.level >= last_status.level or \
- datetime.now() - last_status_change > timedelta(minutes=30):
+ if (
+ not last_status
+ or status.level >= last_status.level
+ or datetime.now() - last_status_change > timedelta(minutes=30)
+ ):
if webhook_url and last_status != status:
- requests.post(webhook_url, json={
- "text": f"Anabasis is now {status.description}! ({status.text})",
- "format": "html",
- "displayName": "ANABASIS PRESENCE",
- })
+ requests.post(
+ webhook_url,
+ json={
+ "text": f"Anabasis is now {status.description}! ({status.text})",
+ "format": "html",
+ "displayName": "ANABASIS PRESENCE",
+ },
+ )
last_status = status
last_status_change = datetime.now()
@@ -167,29 +208,34 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
for output_file in output:
if output_file.endswith(".csv"):
logging.debug(f"Outputting CSV file into {output_file}...")
- with open(output_file, 'w') as file:
+ with open(output_file, "w") as file:
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".lst"):
logging.debug(f"Outputting LST file into {output_file}...")
- with open(output_file, 'w') as file:
+ with open(output_file, "w") as file:
file.write(f"{now}\n")
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".html"):
last_change = None
- for ts, leases in groupby(_fetch_leases(db, now - timedelta(days=7)), key=attrgetter('ts')):
+ for ts, leases in groupby(
+ _fetch_leases(db, now - timedelta(days=7)), key=attrgetter("ts")
+ ):
humans_present = [lease for lease in leases if _is_human(lease)]
if (len(humans_present) > 0) != (status.level > 0):
- last_change = {'ts': ts, 'leases': humans_present}
+ last_change = {"ts": ts, "leases": humans_present}
break
- log_entry = namedtuple('log_entry', ('ts', 'state', 'lease'))
+ log_entry = namedtuple("log_entry", ("ts", "state", "lease"))
log = []
last_seen = []
- for ts, leases in groupby(reversed(_fetch_leases(db, now - timedelta(days=1))), key=attrgetter('ts')):
+ for ts, leases in groupby(
+ reversed(_fetch_leases(db, now - timedelta(days=1))),
+ key=attrgetter("ts"),
+ ):
leases = list(leases)
for lease in leases:
if lease.mac not in (l.mac for l in last_seen):
@@ -205,9 +251,12 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
for idx in range(len(log)):
if idx + 1 == len(log):
continue
- if log[idx].lease.mac == log[idx + 1].lease.mac and \
- not log[idx].state and log[idx + 1].state and \
- log[idx + 1].ts - log[idx].ts < collapse_thresh:
+ if (
+ log[idx].lease.mac == log[idx + 1].lease.mac
+ and not log[idx].state
+ and log[idx + 1].state
+ and log[idx + 1].ts - log[idx].ts < collapse_thresh
+ ):
duplicate_index = idx
if duplicate_index is None:
break
@@ -224,26 +273,28 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
leaderboard_tmp[lease.mac] += 1
mac_to_hostname.setdefault(lease.mac, lease.hostname)
- leaderboard_entry = namedtuple('leaderboard_entry', ('name', 'total'))
+ leaderboard_entry = namedtuple("leaderboard_entry", ("name", "total"))
leaderboard = []
- for mac, minutes in sorted(leaderboard_tmp.items(), key=lambda t: t[1], reverse=True):
+ for mac, minutes in sorted(
+ leaderboard_tmp.items(), key=lambda t: t[1], reverse=True
+ ):
leaderboard.append(
leaderboard_entry(
mac_to_hostname.get(mac) or mac,
- humanize.naturaldelta(timedelta(minutes=minutes))
+ humanize.naturaldelta(timedelta(minutes=minutes)),
)
)
leaderboard = leaderboard[:10]
logging.debug(f"Outputting HTML file into {output_file}...")
- with open(output_file, 'w') as file:
+ with open(output_file, "w") as file:
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
status=status,
last_change=last_change,
log=log,
- leaderboard=leaderboard
+ leaderboard=leaderboard,
)
file.write(out_str)
@@ -251,5 +302,5 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
sleep(period)
-if __name__ == '__main__':
+if __name__ == "__main__":
run_forever()