reformat w/ black

This commit is contained in:
Tomáš Mládek 2022-01-03 20:11:51 +01:00
parent e27fd9e4e2
commit 2df50267ab
No known key found for this signature in database
GPG key ID: ED21612889E75EC5

View file

@ -47,13 +47,13 @@ def _get_db(filepath: str):
c = connection.cursor()
c.execute(
'''CREATE TABLE IF NOT EXISTS spottings (
"""CREATE TABLE IF NOT EXISTS spottings (
id INTEGER PRIMARY KEY,
ts INTEGER,
mac TEXT,
hostname TEXT,
ip TEXT
)'''
)"""
)
connection.commit()
@ -64,8 +64,8 @@ def _fetch_leases(db, from_ts: datetime):
output = []
c = db.cursor()
query = c.execute(
'SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC',
(int(from_ts.timestamp()),)
"SELECT ts, mac, hostname, ip FROM spottings WHERE ts > ? ORDER BY ts DESC",
(int(from_ts.timestamp()),),
)
for row in query:
output.append(Lease(datetime.fromtimestamp(row[0]), row[1], row[2], row[3]))
@ -80,25 +80,39 @@ def _is_human(lease: Lease):
@click.command()
@click.option('--address', default=os.getenv("ROUTER_IP", "192.168.42.1"), help='Address of the router.')
@click.option('--period', default=60, help='How often to check for clients (in seconds).')
@click.option('--ssid', default=os.getenv("SSID", "anabasis"), help='Limit clients to SSID containing this string.')
@click.option('--webhook-url', help='Webhook URL to post status changes to.')
@click.option('-o', '--output', multiple=True, help="Output file.")
def run_forever(address: str, period: int, ssid: str, output: str, webhook_url: Optional[str]):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - [%(levelname)s] %(message)s')
@click.option(
"--address",
default=os.getenv("ROUTER_IP", "192.168.42.1"),
help="Address of the router.",
)
@click.option(
"--period", default=60, help="How often to check for clients (in seconds)."
)
@click.option(
"--ssid",
default=os.getenv("SSID", "anabasis"),
help="Limit clients to SSID containing this string.",
)
@click.option("--webhook-url", help="Webhook URL to post status changes to.")
@click.option("-o", "--output", multiple=True, help="Output file.")
def run_forever(
address: str, period: int, ssid: str, output: str, webhook_url: Optional[str]
):
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
db = _get_db("clients.sqlite3")
connection = routeros_api.RouterOsApiPool(
address,
username='admin',
username="admin",
password=os.environ["ROUTER_PASSWORD"],
plaintext_login=True
plaintext_login=True,
)
jinja_env = Environment(
loader=FileSystemLoader('templates'),
autoescape=select_autoescape(['html', 'xml'])
loader=FileSystemLoader("templates"),
autoescape=select_autoescape(["html", "xml"]),
)
last_status, last_status_change = None, datetime.now()
@ -107,25 +121,33 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
logging.info(f"Querying router at {address}...")
api = connection.get_api()
currently_registered = api.get_resource('/caps-man/registration-table').call('print')
currently_registered = api.get_resource("/caps-man/registration-table").call(
"print"
)
logging.debug(f"Got {len(currently_registered)} registered clients.")
dhcp_leases = api.get_resource('/ip/dhcp-server/lease').call('print')
dhcp_leases = api.get_resource("/ip/dhcp-server/lease").call("print")
logging.debug(f"Got {len(dhcp_leases)} DHCP leases.")
now = datetime.now()
timestamp = int(now.timestamp())
registered_leases: List[Lease] = []
for client in filter(lambda c: ssid in c['ssid'], currently_registered):
for client in filter(lambda c: ssid in c["ssid"], currently_registered):
try:
lease = next(
lease for lease in dhcp_leases if lease.get('active-mac-address') == client['mac-address']
lease
for lease in dhcp_leases
if lease.get("active-mac-address") == client["mac-address"]
)
except StopIteration:
continue
registered_leases.append(
Lease(ts=now, ip=lease['active-address'], mac=lease['active-mac-address'],
hostname=lease.get('host-name'))
Lease(
ts=now,
ip=lease["active-address"],
mac=lease["active-mac-address"],
hostname=lease.get("host-name"),
)
)
registered_leases.sort(key=lambda l: (l.hostname or "").lower())
registered_leases.sort(key=lambda l: not bool(l.hostname))
@ -135,31 +157,50 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
if len(registered_leases) > 0:
people_cnt = len([lease for lease in registered_leases if _is_human(lease)])
if people_cnt > 4:
status = Status(level=2, description='FILLED', text="There seems to be a lot of people!")
status = Status(
level=2,
description="FILLED",
text="There seems to be a lot of people!",
)
elif people_cnt > 0:
status = Status(level=1, description='POPULATED', text="There seem to be people!")
status = Status(
level=1, description="POPULATED", text="There seem to be people!"
)
else:
status = Status(level=0, description='EMPTY', text="There are only computers.")
status = Status(
level=0, description="EMPTY", text="There are only computers."
)
else:
status = Status(level=0, description='VOID', text="There are no devices connected?")
status = Status(
level=0, description="VOID", text="There are no devices connected?"
)
logging.debug("Logging into the database...")
cur = db.cursor()
cur.executemany("INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)", [
(timestamp, lease.mac, lease.hostname, lease.ip) for lease in registered_leases
])
cur.executemany(
"INSERT INTO spottings (ts, mac, hostname, ip) VALUES (?,?,?,?)",
[
(timestamp, lease.mac, lease.hostname, lease.ip)
for lease in registered_leases
],
)
db.commit()
if not last_status or \
status.level >= last_status.level or \
datetime.now() - last_status_change > timedelta(minutes=30):
if (
not last_status
or status.level >= last_status.level
or datetime.now() - last_status_change > timedelta(minutes=30)
):
if webhook_url and last_status != status:
requests.post(webhook_url, json={
"text": f"Anabasis is now <b>{status.description}</b>! ({status.text})",
"format": "html",
"displayName": "ANABASIS PRESENCE",
})
requests.post(
webhook_url,
json={
"text": f"Anabasis is now <b>{status.description}</b>! ({status.text})",
"format": "html",
"displayName": "ANABASIS PRESENCE",
},
)
last_status = status
last_status_change = datetime.now()
@ -167,29 +208,34 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
for output_file in output:
if output_file.endswith(".csv"):
logging.debug(f"Outputting CSV file into {output_file}...")
with open(output_file, 'w') as file:
with open(output_file, "w") as file:
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".lst"):
logging.debug(f"Outputting LST file into {output_file}...")
with open(output_file, 'w') as file:
with open(output_file, "w") as file:
file.write(f"{now}\n")
writer = csv.writer(file)
for lease in registered_leases:
writer.writerow((lease.ip, lease.mac, lease.hostname or "???"))
elif output_file.endswith(".html"):
last_change = None
for ts, leases in groupby(_fetch_leases(db, now - timedelta(days=7)), key=attrgetter('ts')):
for ts, leases in groupby(
_fetch_leases(db, now - timedelta(days=7)), key=attrgetter("ts")
):
humans_present = [lease for lease in leases if _is_human(lease)]
if (len(humans_present) > 0) != (status.level > 0):
last_change = {'ts': ts, 'leases': humans_present}
last_change = {"ts": ts, "leases": humans_present}
break
log_entry = namedtuple('log_entry', ('ts', 'state', 'lease'))
log_entry = namedtuple("log_entry", ("ts", "state", "lease"))
log = []
last_seen = []
for ts, leases in groupby(reversed(_fetch_leases(db, now - timedelta(days=1))), key=attrgetter('ts')):
for ts, leases in groupby(
reversed(_fetch_leases(db, now - timedelta(days=1))),
key=attrgetter("ts"),
):
leases = list(leases)
for lease in leases:
if lease.mac not in (l.mac for l in last_seen):
@ -205,9 +251,12 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
for idx in range(len(log)):
if idx + 1 == len(log):
continue
if log[idx].lease.mac == log[idx + 1].lease.mac and \
not log[idx].state and log[idx + 1].state and \
log[idx + 1].ts - log[idx].ts < collapse_thresh:
if (
log[idx].lease.mac == log[idx + 1].lease.mac
and not log[idx].state
and log[idx + 1].state
and log[idx + 1].ts - log[idx].ts < collapse_thresh
):
duplicate_index = idx
if duplicate_index is None:
break
@ -224,26 +273,28 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
leaderboard_tmp[lease.mac] += 1
mac_to_hostname.setdefault(lease.mac, lease.hostname)
leaderboard_entry = namedtuple('leaderboard_entry', ('name', 'total'))
leaderboard_entry = namedtuple("leaderboard_entry", ("name", "total"))
leaderboard = []
for mac, minutes in sorted(leaderboard_tmp.items(), key=lambda t: t[1], reverse=True):
for mac, minutes in sorted(
leaderboard_tmp.items(), key=lambda t: t[1], reverse=True
):
leaderboard.append(
leaderboard_entry(
mac_to_hostname.get(mac) or mac,
humanize.naturaldelta(timedelta(minutes=minutes))
humanize.naturaldelta(timedelta(minutes=minutes)),
)
)
leaderboard = leaderboard[:10]
logging.debug(f"Outputting HTML file into {output_file}...")
with open(output_file, 'w') as file:
with open(output_file, "w") as file:
out_str = jinja_env.get_template("index.html").render(
now=now,
leases=registered_leases,
status=status,
last_change=last_change,
log=log,
leaderboard=leaderboard
leaderboard=leaderboard,
)
file.write(out_str)
@ -251,5 +302,5 @@ def run_forever(address: str, period: int, ssid: str, output: str, webhook_url:
sleep(period)
if __name__ == '__main__':
if __name__ == "__main__":
run_forever()