BlockyGrapher/dnsmonitor.py

261 lines
8.4 KiB
Python
Raw Permalink Normal View History

2024-06-01 19:24:32 +04:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014-18 Richard Hull and contributors
# See LICENSE.rst for details.
# PYTHON_ARGCOMPLETE_OK
import math
import time
import datetime
import random
import urllib.request
import re
import sys
import random
import signal
#from demo_opts import get_device
from luma.core import cmdline, error
from luma.core.render import canvas
from pathlib import Path
from PIL import ImageFont
def make_font(name, size):
return ImageFont.truetype(name, size)
def display_settings(device, args):
"""
Display a short summary of the settings.
:rtype: str
"""
iface = ''
display_types = cmdline.get_display_types()
if args.display not in display_types['emulator']:
iface = f'Interface: {args.interface}\n'
lib_name = cmdline.get_library_for_display_type(args.display)
if lib_name is not None:
lib_version = cmdline.get_library_version(lib_name)
else:
lib_name = lib_version = 'unknown'
import luma.core
version = f'luma.{lib_name} {lib_version} (luma.core {luma.core.__version__})'
return f'Version: {version}\nDisplay: {args.display}\n{iface}Dimensions: {device.width} x {device.height}\n{"-" * 60}'
def get_device(actual_args=None):
"""
Create device from command-line arguments and return it.
"""
if actual_args is None:
actual_args = sys.argv[1:]
parser = cmdline.create_parser(description='luma.examples arguments')
args = parser.parse_args(actual_args)
if args.config:
# load config from file
config = cmdline.load_config(args.config)
args = parser.parse_args(config + actual_args)
# create device
try:
device = cmdline.create_device(args)
print(display_settings(device, args))
return device
except error.Error as e:
parser.error(e)
return None
def sigterm_handler(signum, frame):
sys.exit(0)
def scalepoints(points):
if device.height == 32:
h = 31
else:
h = 46
sclpoint = [0] * len(points)
2024-06-01 19:24:32 +04:00
scl = 1
maxpnt = 0
i = 0
while i <= len(points) -1:
2024-06-01 19:24:32 +04:00
if points[i] > maxpnt:
maxpnt = points[i]
i += 1
while h * scl < maxpnt:
scl += 1
i = 0
while i <= len(points) -1:
2024-06-01 19:24:32 +04:00
sclpoint[i] = int(points[i] / scl)
i += 1
maxcanv = h * scl
return sclpoint, maxcanv
def movepoints(points, insertq, restarts, insertr):
2024-06-01 19:24:32 +04:00
i = 1
tmp = points[0]
tmp2 = 0
points[0] = insertq
while i < len(points):
if i == len(points) - 1:
2024-06-01 19:24:32 +04:00
points[i] = tmp
else:
tmp2 = points[i]
points[i] = tmp
tmp = tmp2
i += 1
i = 1
tmp3 = restarts[0]
tmp4 = False
restarts[0] = insertr
while i < len(restarts):
if i == len(restarts) - 1:
restarts[i] = tmp3
else:
tmp4 = restarts[i]
restarts[i] = tmp3
tmp3 = tmp4
i += 1
return points, restarts
2024-06-01 19:24:32 +04:00
def parsedata(prev, flaunch):
2024-06-01 19:24:32 +04:00
try:
data = str(urllib.request.urlopen("http://10.0.0.28:4500/metrics").read()).split("\\n")
count = 0
i = 0
2024-06-01 19:24:32 +04:00
while data[i] != "# TYPE blocky_query_total counter":
i += 1
i += 1
while True:
if data[i] == "# HELP blocky_request_duration_ms Request duration distribution":
break
else:
count += int(re.sub('blocky_query_total{.*?} ', '', data[i]))
i += 1
2024-06-01 19:24:32 +04:00
except:
if flaunch == True:
return -1
else:
return 0, -1
2024-06-01 19:24:32 +04:00
if flaunch == True:
return count
else:
return count, count - prev
def drawout(sclpo, maxcanv, failture, restart):
match device.height:
case 32:
# todo: backport
with canvas(device) as draw:
d1 = 1
d2 = device.width - 66
now = datetime.datetime.now()
while d1 < len(sclpo):
draw.line((d2 + 10, 31 - sclpo[d1 - 1], d2, 31 - sclpo[d1]), fill="white")
d1 += 1
d2 -= 10
draw.text((device.width - 53, device.height - 19), now.strftime("%H:%M"), font=regfont, fill="white")
draw.text((0, 0), str(maxcanv), font=smafont, fill="white")
draw.text((0, device.height / 2 - 4), str(int(maxcanv / 2)), font=smafont, fill="white")
draw.text((0, device.height - 9), "0", font=smafont, fill="white")
if failture:
draw.text((device.width - 50, 0), "\uf071", font=icons, fill="white")
if restart:
draw.text((device.width - 30, 0), "\uf021", font=icons, fill="white")
case 64:
with canvas(device) as draw:
# Graph
currpoint = 1
targetpx = 118
while currpoint < len(sclpo):
draw.line((targetpx + 10, 47 - sclpo[currpoint - 1], targetpx, 47 - sclpo[currpoint]), fill="white")
draw.rectangle((targetpx - 1, 47 - sclpo[currpoint] - 1, targetpx + 1, 47 - sclpo[currpoint] + 1), fill="white")
currpoint += 1
targetpx -= 10
# Draw black boxes to make sure that graph will not interfere
draw.rectangle((0, 48, 128, 64), fill="black")
draw.rectangle((0, 0, 6 * len(str(maxcanv)), 9), fill="black")
draw.rectangle((0, 19, 6 * len(str(maxcanv // 2)), 28), fill="black")
draw.rectangle((0, 37, 6, 47), fill="black")
# Numbers on the left
draw.text((0, 0), str(maxcanv), font=smafont, fill="white")
draw.text((0, 19), str(maxcanv // 2), font=smafont, fill="white")
draw.text((0, 37), "0", font=smafont, fill="white")
# Statusbar, non-constant height values are due how fonts render here.
draw.text((75, 45), datetime.datetime.now().strftime("%H:%M"), font=regfont, fill="white")
if failture:
draw.text((2, 50), "\uf071", font=icons, fill="white")
if restart:
draw.text((22, 50), "\uf021", font=icons, fill="white")
case _:
with canvas(device) as draw:
draw.text((0,0), "Display not supported", font=smafont, fill="white")
def getdata(oldtotal):
total, query = parsedata(oldtotal, False)
if query == -1:
return oldtotal, 0, True, False
if total < oldtotal:
return total, 0, False, True
return total, query, False, False
2024-06-01 19:24:32 +04:00
def main():
print("Blocky Graph Monitor for OLED v0.7")
2024-06-01 19:24:32 +04:00
print("(C) Nikopol 2024")
2024-06-01 19:24:32 +04:00
today_last_time = "NaN"
last_hrs = "NaN"
i = 0
2024-06-01 19:24:32 +04:00
failture = False
restart = False
rstate = False
2024-06-01 19:24:32 +04:00
device.contrast(0)
pointsmap = [0] * 14
restartmap = [True] * len(pointsmap)
total = parsedata(0, True)
2024-06-01 19:24:32 +04:00
if total == -1:
failture = True
while True:
now = datetime.datetime.now()
today_time = now.strftime("%H:%M")
hrs = now.strftime("%H")
if today_time != today_last_time:
#if 1:
2024-06-01 19:24:32 +04:00
today_last_time = today_time
if hrs != last_hrs:
#if 1:
2024-06-01 19:24:32 +04:00
last_hrs = hrs
# Grab fresh data
total, query, failture, rstate = getdata(total)
#print(total)
#print(query)
# Now write it to (raw) array and move older data
pointsmap, restartmap = movepoints(pointsmap, query, restartmap, rstate)
# ...and scale it to screen size
sclpo, maxcanv = scalepoints(pointsmap)
# also write restart map
restart = False
while i < len(restartmap):
print(restartmap[i])
if restartmap[i] == True:
restart = True
i += 1
i = 0
drawout(sclpo, maxcanv, failture, restart)
2024-06-01 19:24:32 +04:00
time.sleep(1)
if __name__ == "__main__":
try:
#print("\n".join(sys.argv))
2024-06-01 19:24:32 +04:00
device = get_device()
regfont = make_font("TerminusTTF-4.49.3.ttf", 22)
smafont = make_font("TerminusTTF-4.49.3.ttf", 12)
icons = make_font("fontawesome-webfont.ttf", 15)
signal.signal(signal.SIGTERM, sigterm_handler)
main()
except KeyboardInterrupt:
pass