import asyncio import socket import time from collections import namedtuple from datetime import datetime from operator import itemgetter from urllib.parse import urlparse import aiohttp import feedparser import psutil from aiohttp import ClientConnectorError from bs4 import BeautifulSoup from django.views.generic import TemplateView from humanize import naturalsize from sdbs_infra import settings from sdbs_infra.dashboard.models import Service, Status, Link, Machine, Feed class IndexView(TemplateView): template_name = "index.html" def get_context_data(self, **kwargs): return { 'links': asyncio.run(self.process_links(list(Link.objects.all()))), 'services': asyncio.run(self.process_services(list(Service.objects.all()))), 'machines': asyncio.run(self.process_machines(list(Machine.objects.all()))), 'feed_items': asyncio.run(self.process_feeds(list(Feed.objects.all()))), 'vps_stats': self.vps_stats() } async def process_links(self, links): result = [] session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5, sock_connect=1)) for link in links: index_text = None if not link.image: try: async with session.get(link.url) as response: index_status, index_text = response.status, await response.text() except (asyncio.TimeoutError, ClientConnectorError): pass image = link.image.url if link.image else self.extract_favicon(link.url, index_text) result.append({ 'image_url': image, **vars(link) }) await session.close() return result async def process_services(self, services): result = [] session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5, sock_connect=1)) for service in services: index_status, index_text = None, None if not service.port or not service.image: try: async with session.get(service.url) as response: index_status, index_text = response.status, await response.text() except (asyncio.TimeoutError, ClientConnectorError): pass if service.port: a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) location = ("localhost", service.port) result_of_check = a_socket.connect_ex(location) if result_of_check == 0: status = Status.OK else: status = Status.DOWN elif index_status: status = Status.OK if index_status == 200 else Status.DOWN else: status = Status.UNKNOWN image = service.image.url if service.image else self.extract_favicon(service.url, index_text) result.append({ 'status': status.value, 'image_url': image, **vars(service) }) await session.close() return result async def process_machines(self, machines): result = [] session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5, sock_connect=1), headers={ 'X-Api-Key': settings.HEALTCHECKS_API_KEY }) for machine in machines: status = Status.UNKNOWN last_ping = None if settings.HEALTCHECKS_API_KEY and machine.healthcheck_id: try: async with session.get( f"https://healthchecks.io/api/v1/checks/{machine.healthcheck_id}") as response: check = await response.json() status = { 'up': Status.OK, 'down': Status.DOWN }.get(check.get('status'), Status.UNKNOWN) last_ping = datetime.fromisoformat(check.get('last_ping')) except (asyncio.TimeoutError, ClientConnectorError): pass result.append({ 'status': status.value, 'last_ping': last_ping, **vars(machine) }) return result async def process_feeds(self, feeds): result = [] session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5, sock_connect=1)) for feed in feeds: try: async with session.get(feed.url) as response: parsed_feed = feedparser.parse(await response.text()) entries = parsed_feed.entries for entry in entries: entry.published_datetime = datetime(*entry.published_parsed[0:6]) result.extend(parsed_feed.entries) except (asyncio.TimeoutError, ClientConnectorError): continue result.sort(key=itemgetter('published_parsed'), reverse=True) return result @staticmethod def extract_favicon(url, index_text): if not index_text: return None scheme, netloc, *_ = urlparse(url) base_url = (f"{scheme}://" if scheme else "") + netloc parsed_html = BeautifulSoup(index_text, features="html.parser") link_tags = parsed_html.find_all('link') for rel in ['apple-touch-icon', 'shortcut', 'icon']: for link_tag in link_tags: if rel in link_tag.attrs['rel']: href = link_tag.attrs['href'] if netloc not in href and not (href.startswith("//") or href.startswith("http")): image = base_url + (href if href.startswith("/") else f"/{href}") else: image = href return image # noinspection PyListCreation @staticmethod def vps_stats(): stats = [] stats.append(f"LOAD AVG: {', '.join(map(str, psutil.getloadavg()))}") memory = psutil.virtual_memory() stats.append( f"MEM: {naturalsize(memory.used)}/{naturalsize(memory.total)} ({memory.percent}% USED)" ) disk = psutil.disk_usage('/') stats.append( f"DISK: {naturalsize(disk.used)}/{naturalsize(disk.total)} ({disk.percent}% USED)" ) uptime = normalize_seconds(time.time() - psutil.boot_time()) stats.append( f"UPTIME: {int(uptime.days)} days, {int(uptime.hours)} hours, {int(uptime.minutes)} minutes" ) return " / ".join(map(lambda stat: stat.replace(" ", " "), stats)) def normalize_seconds(seconds: int): (days, remainder) = divmod(seconds, 86400) (hours, remainder) = divmod(remainder, 3600) (minutes, seconds) = divmod(remainder, 60) return namedtuple("_", ("days", "hours", "minutes", "seconds"))(days, hours, minutes, seconds)