import asyncio import logging import os import pickle import re import textwrap from asyncio import sleep import aiohttp import arrow from ics import Calendar, Event from config import Config config = Config() config.WEBHOOK_URL = os.getenv("WEBHOOK_URL", config.WEBHOOK_URL) config.TG_CHAT_ID = os.getenv("TG_CHAT_ID", config.TG_CHAT_ID) config.TG_API_KEY = os.getenv("TG_API_KEY", config.TG_API_KEY) STATE_PATH = os.getenv("STATE_PATH", "state.pickle") class State: notified = [] notified_soon = [] def process_event(event, template): if event.begin.hour == event.begin.minute == 0: date_fmt = 'dddd, D. MMMM YYYY' else: date_fmt = 'HH:mm / dddd, D. MMMM YYYY' date = event.begin.format(date_fmt, locale='cs_CZ') when = event.begin.humanize() location = f"@ {event.location}" if event.location else "" description = event.description or "" description = description.strip() if description: lines = description.splitlines() if "http" in lines[0]: url = lines[0].strip() description = "\n".join(lines[1:]) else: url = "" else: url = "" description = description or "???" description = textwrap.shorten(description, width=512, placeholder="...") output = template.format( title=event.name, date=date, location=location, when=when, url=url, description=description ) output = re.sub(r'(
)+', '
', output) return output async def send_message(text): if config.WEBHOOK_URL: logging.debug(f"Posting via WEBHOOK.") async with aiohttp.ClientSession() as session: result = await session.post(config.WEBHOOK_URL, json={ "text": text, "format": "html", "displayName": "PUBLIC INFORMATION STREAMS & SERVICES", "avatarUrl": config.AVATAR_URL }) logging.debug(f"RECV: [{result.status}] {await result.text()}") if config.TG_CHAT_ID and config.TG_API_KEY: logging.debug(f"Posting via TELEGRAM.") async with aiohttp.ClientSession() as session: result = await session.post(f"https://api.telegram.org/bot{config.TG_API_KEY}/sendMessage", json={ "chat_id": config.TG_CHAT_ID, "text": text.replace('
', '\n'), "parse_mode": "html" }) logging.debug(f"RECV: [{result.status}] {await result.text()}") def get_event_id(event: Event) -> str: return f"{event.begin}//{event.name}" async def main(): try: with open(STATE_PATH, 'rb') as state_fp: state = pickle.load(state_fp) except FileNotFoundError: state = State() while True: calendars = [] async with aiohttp.ClientSession() as session: for url in config.URLS: logging.debug(f"Requesting {url}...") async with session.get(url) as resp: calendars.append(Calendar(await resp.text())) future_events = [event for calendar in calendars for event in calendar.events if event.begin > arrow.now()] logging.debug(f"Got {len(future_events)} future events...") for event in future_events: if get_event_id(event) not in state.notified and event.begin.shift(days=-2) < arrow.now(): logging.info(f"Sending description of {event.name}") await send_message(process_event(event, config.EVENT_TEMPLATE)) state.notified.append(get_event_id(event)) if get_event_id(event) not in state.notified_soon and event.begin.shift(hours=-2) < arrow.now(): logging.info(f"Notifying of {event.name}") await send_message(process_event(event, config.SOON_TEMPLATE)) state.notified_soon.append(get_event_id(event)) with open(STATE_PATH, 'wb') as state_fp: pickle.dump(state, state_fp) logging.debug("Sleeping for 60s...") await sleep(60) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - [%(levelname)s] %(message)s') if config.WEBHOOK_URL is None and (config.TG_API_KEY is None or config.TG_CHAT_ID is None): raise RuntimeError("One of WEBHOOK_URL, TG_API_KEY & TG_CHAT_ID must be defined.") loop = asyncio.get_event_loop() loop.run_until_complete(main())