import asyncio
import logging
import os
import json
import re
import textwrap
from asyncio import sleep
import aiohttp
import arrow
from ics import Calendar, Event
from config import Config
config = Config()
config.WEBHOOK_URL = os.getenv("WEBHOOK_URL", config.WEBHOOK_URL)
config.TG_CHAT_ID = os.getenv("TG_CHAT_ID", config.TG_CHAT_ID)
config.TG_API_KEY = os.getenv("TG_API_KEY", config.TG_API_KEY)
STATE_PATH = os.getenv("STATE_PATH", "state.json")
def process_event(event, template):
if event.begin.hour == event.begin.minute == 0:
date_fmt = "dddd, D. MMMM YYYY"
else:
date_fmt = "HH:mm / dddd, D. MMMM YYYY"
date = event.begin.format(date_fmt, locale="cs_CZ")
when = event.begin.humanize()
location = f"@ {event.location}" if event.location else ""
description = event.description or ""
description = description.strip()
if description:
lines = description.splitlines()
if "http" in lines[0]:
url = lines[0].strip()
description = "\n".join(lines[1:])
else:
url = ""
else:
url = ""
description = description or "???"
description = textwrap.shorten(description, width=512, placeholder="...")
output = template.format(
title=event.name,
date=date,
location=location,
when=when,
url=url,
description=description,
)
output = re.sub(r"(
)+", "
", output)
return output
async def send_message(text):
if config.WEBHOOK_URL:
logging.debug(f"Posting via WEBHOOK.")
async with aiohttp.ClientSession() as session:
result = await session.post(
config.WEBHOOK_URL,
json={
"text": text,
"format": "html",
"displayName": "PUBLIC INFORMATION STREAMS & SERVICES",
"avatarUrl": config.AVATAR_URL,
},
)
logging.debug(f"RECV: [{result.status}] {await result.text()}")
if config.TG_CHAT_ID and config.TG_API_KEY:
logging.debug(f"Posting via TELEGRAM.")
async with aiohttp.ClientSession() as session:
result = await session.post(
f"https://api.telegram.org/bot{config.TG_API_KEY}/sendMessage",
json={
"chat_id": config.TG_CHAT_ID,
"text": text.replace("
", "\n"),
"parse_mode": "html",
},
)
logging.debug(f"RECV: [{result.status}] {await result.text()}")
def get_event_id(event: Event) -> str:
return f"{event.begin}//{event.name}"
async def main():
try:
with open(STATE_PATH, "r") as state_fp:
state = json.load(state_fp)
except FileNotFoundError:
state = {"notified": [], "notified_soon": []}
urls = os.getenv("PISS_URLS", "").split(",")
if not urls:
raise RuntimeError("No PISS_URLS specified!")
while True:
calendars = []
async with aiohttp.ClientSession() as session:
for url in urls:
logging.debug(f"Requesting {url}...")
async with session.get(url) as resp:
calendars.append(Calendar(await resp.text()))
future_events = [
event
for calendar in calendars
for event in calendar.events
if event.begin > arrow.now()
]
logging.debug(f"Got {len(future_events)} future events...")
for event in future_events:
if (
get_event_id(event) not in state["notified"]
and event.begin.shift(days=-2) < arrow.now()
):
logging.info(f"Sending description of {event.name}")
await send_message(process_event(event, config.EVENT_TEMPLATE))
state["notified"].append(get_event_id(event))
if (
get_event_id(event) not in state["notified_soon"]
and event.begin.shift(hours=-2) < arrow.now()
):
logging.info(f"Notifying of {event.name}")
await send_message(process_event(event, config.SOON_TEMPLATE))
state["notified_soon"].append(get_event_id(event))
with open(STATE_PATH, "w") as state_fp:
json.dump(state, state_fp)
logging.debug("Sleeping for 60s...")
await sleep(60)
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
if config.WEBHOOK_URL is None and (
config.TG_API_KEY is None or config.TG_CHAT_ID is None
):
raise RuntimeError(
"One of WEBHOOK_URL, TG_API_KEY & TG_CHAT_ID must be defined."
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())