#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2014-18 Richard Hull and contributors # See LICENSE.rst for details. # PYTHON_ARGCOMPLETE_OK import math import time import datetime import random import urllib.request import re import sys import random import signal #from demo_opts import get_device from luma.core import cmdline, error from luma.core.render import canvas from pathlib import Path from PIL import ImageFont def make_font(name, size): return ImageFont.truetype(name, size) def display_settings(device, args): """ Display a short summary of the settings. :rtype: str """ iface = '' display_types = cmdline.get_display_types() if args.display not in display_types['emulator']: iface = f'Interface: {args.interface}\n' lib_name = cmdline.get_library_for_display_type(args.display) if lib_name is not None: lib_version = cmdline.get_library_version(lib_name) else: lib_name = lib_version = 'unknown' import luma.core version = f'luma.{lib_name} {lib_version} (luma.core {luma.core.__version__})' return f'Version: {version}\nDisplay: {args.display}\n{iface}Dimensions: {device.width} x {device.height}\n{"-" * 60}' def get_device(actual_args=None): """ Create device from command-line arguments and return it. """ if actual_args is None: actual_args = sys.argv[1:] parser = cmdline.create_parser(description='luma.examples arguments') args = parser.parse_args(actual_args) if args.config: # load config from file config = cmdline.load_config(args.config) args = parser.parse_args(config + actual_args) # create device try: device = cmdline.create_device(args) print(display_settings(device, args)) return device except error.Error as e: parser.error(e) return None def sigterm_handler(signum, frame): sys.exit(0) def scalepoints(points, blpoints): if device.height == 32: h = 31 else: h = 46 sclpoint = [0] * len(points) blsclpoint = [0] * len(blpoints) scl = 1 maxpnt = 0 i = 0 while i <= len(points) -1: if points[i] > maxpnt: maxpnt = points[i] i += 1 while h * scl < maxpnt: scl += 1 i = 0 while i <= len(points) -1: sclpoint[i] = int(points[i] / scl) i += 1 i = 0 while i <= len(blpoints) -1: blsclpoint[i] = int(blpoints[i] / scl) i += 1 maxcanv = h * scl return sclpoint, blsclpoint, maxcanv def movepoints(points, insertq, blpoints, insertbl, restarts, insertr): i = 1 tmp = points[0] tmp2 = 0 points[0] = insertq while i < len(points): if i == len(points) - 1: points[i] = tmp else: tmp2 = points[i] points[i] = tmp tmp = tmp2 i += 1 i = 1 tmp3 = restarts[0] tmp4 = False restarts[0] = insertr while i < len(restarts): if i == len(restarts) - 1: restarts[i] = tmp3 else: tmp4 = restarts[i] restarts[i] = tmp3 tmp3 = tmp4 i += 1 i = 1 tmp = blpoints[0] tmp2 = 0 blpoints[0] = insertbl while i < len(blpoints): if i == len(blpoints) - 1: blpoints[i] = tmp else: tmp2 = blpoints[i] blpoints[i] = tmp tmp = tmp2 i += 1 i = 1 return points, blpoints, restarts def extractor(prev, flaunch, url): try: data = str(urllib.request.urlopen(url).read()).split("\\n") count = 0 i = 0 while data[i] != "# TYPE blocky_query_total counter": i += 1 i += 1 while True: if data[i] == "# HELP blocky_request_duration_ms Request duration distribution": break else: count += int(re.sub('blocky_query_total{.*?} ', '', data[i])) i += 1 except urllib.error.URLError as e: cliout(f"Connection to server failed: {e.reason}", 2) if flaunch == True: return -1 else: return 0, -1 except: cliout("Extraction (Q) failed.", 2) if flaunch == True: return -1 else: return 0, -1 if flaunch == True: return count else: return count, count - prev def blextractor(prev, flaunch, url): try: data = str(urllib.request.urlopen(url).read()).split("\\n") count = 0 i = 0 while data[i] != "# TYPE blocky_response_total counter": i += 1 i += 1 while True: if "blocky_response_total{reason=\"CACHED\",response_code=\"NOERROR\",response_type=\"CACHED\"}" in data[i]: break else: count += int(re.sub('blocky_response_total{.*?} ', '', data[i])) i += 1 except urllib.error.URLError as e: cliout(f"Connection to server failed: {e.reason}", 2) if flaunch == True: return -1 else: return 0, -1 except: cliout("Extraction (BL) failed.", 2) if flaunch == True: return -1 else: return 0, -1 if flaunch == True: return count else: return count, count - prev def drawout(sclpo, sclbl, maxcanv, failture, restart, distance, dots): match device.height: case 32: # todo: backport with canvas(device) as draw: d1 = 1 d2 = device.width - 66 now = datetime.datetime.now() while d1 < len(sclpo): draw.line((d2 + 10, 31 - sclpo[d1 - 1], d2, 31 - sclpo[d1]), fill="white") d1 += 1 d2 -= 10 draw.text((device.width - 53, device.height - 19), now.strftime("%H:%M"), font=regfont, fill="white") draw.text((0, 0), str(maxcanv), font=smafont, fill="white") draw.text((0, device.height / 2 - 4), str(int(maxcanv / 2)), font=smafont, fill="white") draw.text((0, device.height - 9), "0", font=smafont, fill="white") if failture: draw.text((device.width - 50, 0), "\uf071", font=icons, fill="white") if restart: draw.text((device.width - 30, 0), "\uf021", font=icons, fill="white") case 64: with canvas(device) as draw: # Blocked (solid) Graph currpoint = 1 targetpx = 128 - distance while currpoint < len(sclbl): if (sclbl[currpoint - 1] < sclbl[currpoint]): draw.polygon([(targetpx + distance, 47 - sclbl[currpoint - 1]), (targetpx, 47 - sclbl[currpoint]), (targetpx, 47 - sclbl[currpoint - 1])], fill="white") #cliout(f"RectDBG (less): X1: {targetpx}, Y1: {47 - sclbl[currpoint - 1]}, X2:{targetpx + distance}, Y2: 47",4) draw.rectangle((targetpx, 47 - sclbl[currpoint - 1], targetpx + distance, 47), fill="white") else: draw.polygon([(targetpx + distance, 47 - sclbl[currpoint - 1]), (targetpx + distance, 47 - sclbl[currpoint]), (targetpx, 47 - sclbl[currpoint])], fill="white") #cliout(f"RectDBG (more): X1: {targetpx}, Y1: {47 - sclbl[currpoint]}, X1: {targetpx + distance}, Y2: 47", 4) draw.rectangle((targetpx, 47 - sclbl[currpoint], targetpx + distance, 47), fill="white") currpoint += 1 targetpx -= distance # Main Graph currpoint = 1 targetpx = 128 - distance while currpoint < len(sclpo): draw.line((targetpx + distance, 47 - sclpo[currpoint - 1], targetpx, 47 - sclpo[currpoint]), fill="white") if dots == True: draw.rectangle((targetpx - 1, 47 - sclpo[currpoint] - 1, targetpx + 1, 47 - sclpo[currpoint] + 1), fill="white") currpoint += 1 targetpx -= distance # Draw black boxes to make sure that graph will not interfere draw.rectangle((0, 48, 128, 64), fill="black") draw.rectangle((0, 0, 6 * len(str(maxcanv)), 9), fill="black") draw.rectangle((0, 19, 6 * len(str(maxcanv // 2)), 28), fill="black") draw.rectangle((0, 37, 6, 47), fill="black") # Numbers on the left draw.text((0, 0), str(maxcanv), font=smafont, fill="white") draw.text((0, 19), str(maxcanv // 2), font=smafont, fill="white") draw.text((0, 37), "0", font=smafont, fill="white") # Statusbar, non-constant height values are due how fonts render here. draw.text((75, 45), datetime.datetime.now().strftime("%H:%M"), font=regfont, fill="white") if failture: draw.text((2, 50), "\uf071", font=icons, fill="white") if restart: draw.text((22, 50), "\uf021", font=icons, fill="white") case _: with canvas(device) as draw: draw.text((0,0), "Display not supported", font=smafont, fill="white") def getdata(oldtotal, oldbltotal, url): total, query = extractor(oldtotal, False, url) bltotal, blquery = blextractor(oldbltotal, False, url) # Server lifetime counter | Difference | Err? | Restarted? if query == -1: cliout("Extractor finished with error.", 1) return oldtotal, 0, bltotal, 0, True, False if total < oldtotal: cliout("Server restart was detected.", 1) return total, 0, bltotal, 0, False, True return total, query, bltotal, blquery, False, False def cliout(data, code): time = datetime.datetime.now().strftime("%H:%M:%S") match code: case 0: codes = "I" case 1: codes = "W" case 2: codes = "E" case 3: codes = "F" case 4: codes = "V" print(f"[{codes}] [{time}] {data}") def main(): cliout("Blocky Graph Monitor for OLED v0.9b", 0) cliout("Nikopol 2024", 0) cliout("Init...", 0) # vars spagetti url = "http://127.0.0.1:4500/metrics" pointscount = 14 distance = 10 today_last_time = "NaN" last_hrs = "NaN" i = 1 failture = False restart = False rstate = False dots = False lightsoff = False start = 22 stop = 6 device.contrast(1) # Load config try: filedata = open("config.ini").readlines() except FileNotFoundError: f = open("config.ini", "x") f.write("[source]\nurl = http://127.0.0.1:4500/metrics\n[appearance]\npoints = 14\npointsDistance = 10\ndots = True\n[lightsoff]\nenabled = False\nstart = 23\nstop = 06") f.close() cliout("config.ini does not exist. Please edit newly created file and start the program.", 3) exit() except: cliout("Failed to read/write config file. Are permissions correct?", 3) exit() while i < len(filedata): if "url = " in filedata[i]: url = filedata[i][6:].rstrip() if "points = " in filedata[i]: pointscount = int(filedata[i][9:]) if "pointsDistance = " in filedata[i]: distance = int(filedata[i][17:]) if "dots = True" in filedata[i]: dots = True if "enabled = True" in filedata[i]: lightsoff = True if "start = " in filedata[i]: start = int(filedata[i][8:]) if "stop = " in filedata[i]: stop = int(filedata[i][7:]) i += 1 i = 0 cliout("Config load ok", 0) pointsmap = [0] * pointscount blockedmap = [0] * len(pointsmap) restartmap = [True] * len(pointsmap) total = extractor(0, True, url) bltotal = blextractor(0, True, url) # Show fail on boot time if occurs. if total == -1: failture = True while True: now = datetime.datetime.now() today_time = now.strftime("%H:%M") hrs = now.strftime("%H") #cliout(f"start is {start}", 1) if today_time != today_last_time: #if 1: today_last_time = today_time if hrs != last_hrs: #if 1: if (int(hrs) < stop and int(hrs) >= start and lightsoff == True): cliout(f"Lightsoff is enabled and within time range. Screen will be black this hour.", 0) last_hrs = hrs # Grab fresh data cliout("Extracting data...", 0) total, query, bltotal, blquery, failture, rstate = getdata(total, bltotal, url) #print(total) #print(query) # Now write it to (raw) array and move older data pointsmap, blockedmap, restartmap = movepoints(pointsmap, query, blockedmap, blquery, restartmap, rstate) # ...and scale it to screen size sclpo, sclbl, maxcanv = scalepoints(pointsmap, blockedmap) # also write restart map restart = False while i < len(restartmap): if restartmap[i] == True: restart = True i += 1 i = 0 cliout(f"Done. Current: {query}, Blocked: {blquery}", 0) if (int(hrs) < stop and int(hrs) >= start and lightsoff == True): with canvas(device) as draw: # :troll: hrs = hrs else: drawout(sclpo, sclbl, maxcanv, failture, restart, distance, dots) time.sleep(1) if __name__ == "__main__": try: print("[I] Preinit...") device = get_device() regfont = make_font("TerminusTTF-4.49.3.ttf", 22) smafont = make_font("TerminusTTF-4.49.3.ttf", 12) icons = make_font("fontawesome-webfont.ttf", 15) signal.signal(signal.SIGTERM, sigterm_handler) main() except KeyboardInterrupt: pass