piss-bot/bot.py

159 lines
4.8 KiB
Python

import asyncio
import logging
import os
import json
import re
import textwrap
from asyncio import sleep
import aiohttp
import arrow
from ics import Calendar, Event
from config import Config
config = Config()
config.WEBHOOK_URL = os.getenv("WEBHOOK_URL", config.WEBHOOK_URL)
config.TG_CHAT_ID = os.getenv("TG_CHAT_ID", config.TG_CHAT_ID)
config.TG_API_KEY = os.getenv("TG_API_KEY", config.TG_API_KEY)
STATE_PATH = os.getenv("STATE_PATH", "state.json")
def process_event(event, template):
if event.begin.hour == event.begin.minute == 0:
date_fmt = "dddd, D. MMMM YYYY"
else:
date_fmt = "HH:mm / dddd, D. MMMM YYYY"
date = event.begin.format(date_fmt, locale="cs_CZ")
when = event.begin.humanize()
location = f"@ {event.location}" if event.location else ""
description = event.description or ""
description = description.strip()
if description:
lines = description.splitlines()
if "http" in lines[0]:
url = lines[0].strip()
description = "\n".join(lines[1:])
else:
url = ""
else:
url = ""
description = description or "???"
description = textwrap.shorten(description, width=512, placeholder="...")
output = template.format(
title=event.name,
date=date,
location=location,
when=when,
url=url,
description=description,
)
output = re.sub(r"(<br>)+", "<br>", output)
return output
async def send_message(text):
if config.WEBHOOK_URL:
logging.debug(f"Posting via WEBHOOK.")
async with aiohttp.ClientSession() as session:
result = await session.post(
config.WEBHOOK_URL,
json={
"text": text,
"format": "html",
"displayName": "PUBLIC INFORMATION STREAMS & SERVICES",
"avatarUrl": config.AVATAR_URL,
},
)
logging.debug(f"RECV: [{result.status}] {await result.text()}")
if config.TG_CHAT_ID and config.TG_API_KEY:
logging.debug(f"Posting via TELEGRAM.")
async with aiohttp.ClientSession() as session:
result = await session.post(
f"https://api.telegram.org/bot{config.TG_API_KEY}/sendMessage",
json={
"chat_id": config.TG_CHAT_ID,
"text": text.replace("<br>", "\n"),
"parse_mode": "html",
},
)
logging.debug(f"RECV: [{result.status}] {await result.text()}")
def get_event_id(event: Event) -> str:
return f"{event.begin}//{event.name}"
async def main():
try:
with open(STATE_PATH, "r") as state_fp:
state = json.load(state_fp)
except FileNotFoundError:
state = {"notified": [], "notified_soon": []}
urls = os.getenv("PISS_URLS", "").split(",")
if not urls:
raise RuntimeError("No PISS_URLS specified!")
while True:
calendars = []
async with aiohttp.ClientSession() as session:
for url in urls:
logging.debug(f"Requesting {url}...")
async with session.get(url) as resp:
calendars.append(Calendar(await resp.text()))
future_events = [
event
for calendar in calendars
for event in calendar.events
if event.begin > arrow.now()
]
logging.debug(f"Got {len(future_events)} future events...")
for event in future_events:
if (
get_event_id(event) not in state["notified"]
and event.begin.shift(days=-2) < arrow.now()
):
logging.info(f"Sending description of {event.name}")
await send_message(process_event(event, config.EVENT_TEMPLATE))
state["notified"].append(get_event_id(event))
if (
get_event_id(event) not in state["notified_soon"]
and event.begin.shift(hours=-2) < arrow.now()
):
logging.info(f"Notifying of {event.name}")
await send_message(process_event(event, config.SOON_TEMPLATE))
state["notified_soon"].append(get_event_id(event))
with open(STATE_PATH, "w") as state_fp:
json.dump(state, state_fp)
logging.debug("Sleeping for 60s...")
await sleep(60)
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
if config.WEBHOOK_URL is None and (
config.TG_API_KEY is None or config.TG_CHAT_ID is None
):
raise RuntimeError(
"One of WEBHOOK_URL, TG_API_KEY & TG_CHAT_ID must be defined."
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())