import asyncio
import logging
import os
import pickle
import re
import textwrap
from asyncio import sleep
import aiohttp
import arrow
from ics import Calendar, Event
from config import Config
config = Config()
config.WEBHOOK_URL = os.getenv("WEBHOOK_URL", config.WEBHOOK_URL)
config.TG_CHAT_ID = os.getenv("TG_CHAT_ID", config.TG_CHAT_ID)
config.TG_API_KEY = os.getenv("TG_API_KEY", config.TG_API_KEY)
STATE_PATH = os.getenv("STATE_PATH", "state.pickle")
class State:
def __init__(self) -> None:
self.notified = []
self.notified_soon = []
def process_event(event, template):
if event.begin.hour == event.begin.minute == 0:
date_fmt = "dddd, D. MMMM YYYY"
else:
date_fmt = "HH:mm / dddd, D. MMMM YYYY"
date = event.begin.format(date_fmt, locale="cs_CZ")
when = event.begin.humanize()
location = f"@ {event.location}" if event.location else ""
description = event.description or ""
description = description.strip()
if description:
lines = description.splitlines()
if "http" in lines[0]:
url = lines[0].strip()
description = "\n".join(lines[1:])
else:
url = ""
else:
url = ""
description = description or "???"
description = textwrap.shorten(description, width=512, placeholder="...")
output = template.format(
title=event.name,
date=date,
location=location,
when=when,
url=url,
description=description,
)
output = re.sub(r"(
)+", "
", output)
return output
async def send_message(text):
if config.WEBHOOK_URL:
logging.debug(f"Posting via WEBHOOK.")
async with aiohttp.ClientSession() as session:
result = await session.post(
config.WEBHOOK_URL,
json={
"text": text,
"format": "html",
"displayName": "PUBLIC INFORMATION STREAMS & SERVICES",
"avatarUrl": config.AVATAR_URL,
},
)
logging.debug(f"RECV: [{result.status}] {await result.text()}")
if config.TG_CHAT_ID and config.TG_API_KEY:
logging.debug(f"Posting via TELEGRAM.")
async with aiohttp.ClientSession() as session:
result = await session.post(
f"https://api.telegram.org/bot{config.TG_API_KEY}/sendMessage",
json={
"chat_id": config.TG_CHAT_ID,
"text": text.replace("
", "\n"),
"parse_mode": "html",
},
)
logging.debug(f"RECV: [{result.status}] {await result.text()}")
def get_event_id(event: Event) -> str:
return f"{'_'.join(str(x) for x in event.begin.isocalendar())}//{event.name}"
async def main():
try:
with open(STATE_PATH, "rb") as state_fp:
state = pickle.load(state_fp)
except FileNotFoundError:
state = State()
while True:
calendars = []
async with aiohttp.ClientSession() as session:
for url in config.URLS:
logging.debug(f"Requesting {url}...")
async with session.get(url) as resp:
calendars.append(Calendar(await resp.text()))
future_events = [
event
for calendar in calendars
for event in calendar.events
if event.begin > arrow.now()
]
logging.debug(f"Got {len(future_events)} future events...")
for event in future_events:
if (
get_event_id(event) not in state.notified
and event.begin.shift(days=-2) < arrow.now()
):
logging.info(f"Sending description of {event.name}")
await send_message(process_event(event, config.EVENT_TEMPLATE))
state.notified.append(get_event_id(event))
if (
get_event_id(event) not in state.notified_soon
and event.begin.shift(hours=-2) < arrow.now()
):
logging.info(f"Notifying of {event.name}")
await send_message(process_event(event, config.SOON_TEMPLATE))
state.notified_soon.append(get_event_id(event))
with open(STATE_PATH, "wb") as state_fp:
pickle.dump(state, state_fp)
logging.debug("Sleeping for 60s...")
await sleep(60)
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - [%(levelname)s] %(message)s"
)
if config.WEBHOOK_URL is None and (
config.TG_API_KEY is None or config.TG_CHAT_ID is None
):
raise RuntimeError(
"One of WEBHOOK_URL, TG_API_KEY & TG_CHAT_ID must be defined."
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())