piss-bot/bot.py

160 lines
4.8 KiB
Python
Raw Permalink Normal View History

2021-02-26 22:29:06 +01:00
import asyncio
import logging
2021-02-27 10:38:58 +01:00
import os
2021-05-26 20:35:37 +02:00
import json
2021-02-27 09:52:47 +01:00
import re
2021-02-26 22:29:06 +01:00
import textwrap
from asyncio import sleep
import aiohttp
import arrow
2021-02-27 10:53:27 +01:00
from ics import Calendar, Event
2021-02-26 22:29:06 +01:00
from config import Config
config = Config()
2021-02-27 10:38:58 +01:00
config.WEBHOOK_URL = os.getenv("WEBHOOK_URL", config.WEBHOOK_URL)
2021-02-27 11:54:04 +01:00
config.TG_CHAT_ID = os.getenv("TG_CHAT_ID", config.TG_CHAT_ID)
config.TG_API_KEY = os.getenv("TG_API_KEY", config.TG_API_KEY)
2021-02-26 22:29:06 +01:00
2021-05-26 20:35:37 +02:00
STATE_PATH = os.getenv("STATE_PATH", "state.json")
2021-02-26 22:29:06 +01:00
def process_event(event, template):
if event.begin.hour == event.begin.minute == 0:
2021-05-26 18:29:08 +02:00
date_fmt = "dddd, D. MMMM YYYY"
2021-02-26 22:29:06 +01:00
else:
2021-05-26 18:29:08 +02:00
date_fmt = "HH:mm / dddd, D. MMMM YYYY"
2021-02-26 22:29:06 +01:00
2021-05-26 18:29:08 +02:00
date = event.begin.format(date_fmt, locale="cs_CZ")
2021-02-26 22:29:06 +01:00
when = event.begin.humanize()
location = f"@ {event.location}" if event.location else ""
description = event.description or ""
description = description.strip()
if description:
lines = description.splitlines()
if "http" in lines[0]:
url = lines[0].strip()
description = "\n".join(lines[1:])
else:
url = ""
else:
url = ""
description = description or "???"
description = textwrap.shorten(description, width=512, placeholder="...")
2021-02-27 09:52:47 +01:00
output = template.format(
2021-02-26 22:29:06 +01:00
title=event.name,
date=date,
location=location,
when=when,
url=url,
2021-05-26 18:29:08 +02:00
description=description,
2021-02-26 22:29:06 +01:00
)
2021-05-26 18:29:08 +02:00
output = re.sub(r"(<br>)+", "<br>", output)
2021-02-27 09:52:47 +01:00
return output
2021-02-26 22:29:06 +01:00
async def send_message(text):
2021-02-27 11:54:04 +01:00
if config.WEBHOOK_URL:
logging.debug(f"Posting via WEBHOOK.")
async with aiohttp.ClientSession() as session:
2021-05-26 18:29:08 +02:00
result = await session.post(
config.WEBHOOK_URL,
json={
"text": text,
"format": "html",
"displayName": "PUBLIC INFORMATION STREAMS & SERVICES",
"avatarUrl": config.AVATAR_URL,
},
)
2021-02-27 11:54:04 +01:00
logging.debug(f"RECV: [{result.status}] {await result.text()}")
if config.TG_CHAT_ID and config.TG_API_KEY:
logging.debug(f"Posting via TELEGRAM.")
async with aiohttp.ClientSession() as session:
2021-05-26 18:29:08 +02:00
result = await session.post(
f"https://api.telegram.org/bot{config.TG_API_KEY}/sendMessage",
json={
"chat_id": config.TG_CHAT_ID,
"text": text.replace("<br>", "\n"),
"parse_mode": "html",
},
)
2021-02-27 11:54:04 +01:00
logging.debug(f"RECV: [{result.status}] {await result.text()}")
2021-02-26 22:29:06 +01:00
2021-02-27 10:53:27 +01:00
def get_event_id(event: Event) -> str:
return f"{event.begin}//{event.name}"
2021-02-27 10:53:27 +01:00
2021-02-26 22:29:06 +01:00
async def main():
try:
2021-05-26 20:35:37 +02:00
with open(STATE_PATH, "r") as state_fp:
state = json.load(state_fp)
2021-02-26 22:29:06 +01:00
except FileNotFoundError:
2021-05-26 20:35:37 +02:00
state = {"notified": [], "notified_soon": []}
2021-02-26 22:29:06 +01:00
2021-06-12 22:02:30 +02:00
urls = os.getenv("PISS_URLS", "").split(",")
if not urls:
raise RuntimeError("No PISS_URLS specified!")
2021-02-26 22:29:06 +01:00
while True:
calendars = []
async with aiohttp.ClientSession() as session:
2021-06-12 22:02:30 +02:00
for url in urls:
2021-02-26 22:29:06 +01:00
logging.debug(f"Requesting {url}...")
async with session.get(url) as resp:
calendars.append(Calendar(await resp.text()))
2021-05-26 18:29:08 +02:00
future_events = [
event
for calendar in calendars
for event in calendar.events
if event.begin > arrow.now()
]
2021-02-26 22:29:06 +01:00
logging.debug(f"Got {len(future_events)} future events...")
for event in future_events:
2021-05-26 18:29:08 +02:00
if (
2021-05-26 20:35:37 +02:00
get_event_id(event) not in state["notified"]
2021-05-26 18:29:08 +02:00
and event.begin.shift(days=-2) < arrow.now()
):
2021-02-26 22:34:03 +01:00
logging.info(f"Sending description of {event.name}")
2021-02-26 22:29:06 +01:00
await send_message(process_event(event, config.EVENT_TEMPLATE))
2021-05-26 20:35:37 +02:00
state["notified"].append(get_event_id(event))
2021-02-26 22:29:06 +01:00
2021-05-26 18:29:08 +02:00
if (
2021-05-26 20:35:37 +02:00
get_event_id(event) not in state["notified_soon"]
2021-05-26 18:29:08 +02:00
and event.begin.shift(hours=-2) < arrow.now()
):
2021-02-26 22:34:03 +01:00
logging.info(f"Notifying of {event.name}")
2021-02-26 22:29:06 +01:00
await send_message(process_event(event, config.SOON_TEMPLATE))
2021-05-26 20:35:37 +02:00
state["notified_soon"].append(get_event_id(event))
2021-02-26 22:29:06 +01:00
2021-05-26 20:35:37 +02:00
with open(STATE_PATH, "w") as state_fp:
json.dump(state, state_fp)
2021-02-26 22:29:06 +01:00
logging.debug("Sleeping for 60s...")
await sleep(60)
2021-05-26 18:29:08 +02:00
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
2021-02-26 22:29:06 +01:00
2021-05-26 18:29:08 +02:00
if config.WEBHOOK_URL is None and (
config.TG_API_KEY is None or config.TG_CHAT_ID is None
):
raise RuntimeError(
"One of WEBHOOK_URL, TG_API_KEY & TG_CHAT_ID must be defined."
)
2021-02-26 22:29:06 +01:00
loop = asyncio.get_event_loop()
loop.run_until_complete(main())