import asyncio import socket import time from collections import namedtuple import aiohttp import psutil from bs4 import BeautifulSoup from django.views.generic import TemplateView from humanize import naturalsize from sdbs_infra.dashboard.models import Service, ServiceStatus class IndexView(TemplateView): template_name = "index.html" def get_context_data(self, **kwargs): return { 'services': asyncio.run(self.process_services(list(Service.objects.all()))), 'vps_stats': self.vps_stats() } @staticmethod async def process_services(services): result = [] session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5, sock_connect=1)) for service in services: index_status, index_text = None, None if not service.port or not service.image: try: async with session.get(service.url) as response: index_status, index_text = response.status, await response.text() except asyncio.TimeoutError: pass if service.port: a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) location = ("localhost", service.port) result_of_check = a_socket.connect_ex(location) if result_of_check == 0: status = ServiceStatus.OK else: status = ServiceStatus.DOWN elif index_status: status = ServiceStatus.OK if index_status == 200 else ServiceStatus.DOWN else: status = ServiceStatus.UNKNOWN image = None if service.image: image = service.image.url elif index_text: parsed_html = BeautifulSoup(index_text, features="html.parser") link_tags = parsed_html.find_all('link') for rel in ['apple-touch-icon', 'shortcut', 'icon']: for link_tag in link_tags: if rel in link_tag.attrs['rel']: link = link_tag.attrs['href'] if service.url not in link: image = service.url + (link if link.startswith("/") else f"/{link}") else: image = link result.append({ 'status': status.value, 'image_url': image, **vars(service) }) await session.close() return result # noinspection PyListCreation @staticmethod def vps_stats(): stats = [] stats.append(f"LOAD AVG: {', '.join(map(str, psutil.getloadavg()))}") memory = psutil.virtual_memory() stats.append( f"MEM: {naturalsize(memory.used)}/{naturalsize(memory.total)} ({memory.percent}% USED)" ) disk = psutil.disk_usage('/') stats.append( f"DISK: {naturalsize(disk.used)}/{naturalsize(disk.total)} ({disk.percent}% USED)" ) uptime = normalize_seconds(time.time() - psutil.boot_time()) stats.append( f"UPTIME: {int(uptime.days)} days, {int(uptime.hours)} hours, {int(uptime.minutes)} minutes" ) return " / ".join(map(lambda stat: stat.replace(" ", " "), stats)) def normalize_seconds(seconds: int): (days, remainder) = divmod(seconds, 86400) (hours, remainder) = divmod(remainder, 3600) (minutes, seconds) = divmod(remainder, 60) return namedtuple("_", ("days", "hours", "minutes", "seconds"))(days, hours, minutes, seconds)