Source code for libs.SysUtil

import subprocess
import random, string, os, socket, json, time
from glob import glob
from urllib import request
import threading
import configparser
import yaml
import logging
import logging.config
import fcntl
import datetime
import collections
from dateutil import parser
import traceback
from zlib import crc32

USBDEVFS_RESET = 21780
try:
    logging.config.fileConfig("logging.ini")
except:
    pass


[docs]def sizeof_fmt(num, suffix='B'): for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix)
default_config = """ [DEFAULT] exposure = 0 enabled = on resize = on [camera] name = enabled = on [ftp] enabled = on replace = on resize = on timestamped = on server = sftp.traitcapture.org directory = /picam username = picam password = DEFAULT_PASSWORD [timelapse] interval = 300 starttime = 00:00 stoptime = 23:59 [localfiles] spooling_dir = upload_dir = """ default_light_config = """ [light] max_power = 1000 min_power = 0 wavelengths = "400nm,420nm,450nm,530nm,630nm,660nm,735nm" csv_keys = "LED1,LED2,LED3,LED4,LED5,LED6,LED7" file_path = "lights_byserial/{identifier}.scf" [telnet] telnet_host = "192.168.2.124" telnet_port = 50630 set_all_command = setall {power} set_wavelength_command = setwlrelpower {wavelength} {power} set_all_wavelength_command = setwlsrelpower {} {} {} {} {} {} {} get_wavelength_command = getwlrelpower {wavelength} [url] url_host = "192.168.2.124" control_uri = /cgi-bin/userI.cgi set_all_command = "setAllTo": {percent}, "setAllSub": "set" set_all_wavelength_command = "wl1":{}, "wl2":{}, "wl3":{}, "wl4":{}, "wl5":{}, "wl6":{}, "wl7":{} """ default_chamber_config = """ # this file should live in "spc-eyepi/chambers_byip/<ip>.ini" # this file shoudl be accompanied by a "spc-eyepi/light_configs_byip/<ip>" .csv or .slc file [chamber] name = GC36 temperature_multiplier = 10.0 file_path = "chambers_byip/{identifier}.scf" [telnet] telnet_host = 192.168.0.36 telnet_port = 50630 telnet_user = root telnet_password = froot deviceid = 0 # this should contain the ip address for a light, or no ip if [light] ip = 192.168.2.124 """
[docs]def recursive_update(d, u): if d is None: d = dict() for k, v in u.items(): if isinstance(v, collections.Mapping): r = recursive_update(d.get(k, dict()), v) d[k] = r else: d[k] = u[k] return d
[docs]def get_generator(fh): while True: data = fh.readline() if not data: break yield data
[docs]class LazySolarCalcReader(object): def __init__(self, fn): self._fn = fn self._fh = open(self._fn) self._rewind() def _rewind(self, index=0): self._fh.seek(0) self._generator = get_generator(self._fh) self._index = 0 while self._index < index: try: next(self._generator) self._index += 1 except StopIteration: self._rewind(index=0) break def _getitem_slice(self, slice): start, stop, step = slice.start, slice.stop, slice.step start = 0 if start is None else start stop = float("inf") if stop is None else stop step = 1 if step is None else step if stop is not None and stop < start: start, stop = stop, start step *= -1 r = [] ii = start while ii <= stop: try: r.append(self[ii]) ii += step except IndexError: break return r def _parse_line(self, line_str: str) -> list: """ parses a string into a list that can be read by the chamber :param line_str: :return: list of values. """ line = line_str.strip().split(",") def f(v): try: return float(v) except: return v if len(line) in (16, 13): try: return [ parser.parse("{} {}".format(line[0], line[1]), dayfirst=True), *map(f, line[2:-1]), parser.parse(line[-1]) ] except Exception as e: traceback.print_exc() else: try: return [ parser.parse(line[0], dayfirst=True), *map(f, line[1:-1]), parser.parse(line[-1]) ] except: traceback.print_exc() return list(map(f, line)) def __len__(self): tempindex = 0 self._rewind() while True: try: next(self._generator) tempindex += 1 except StopIteration: break self._rewind() return tempindex def _getitem_int(self, index): if index < 0: index = len(self)+index if index < 0: raise IndexError if index == 0: self._rewind() if index < self._index: self._rewind() v = "" while self._index <= index: try: v = next(self._generator) self._index += 1 except StopIteration: raise IndexError return self._parse_line(v) def __getitem__(self, index): if isinstance(index, slice): return self._getitem_slice(index) elif isinstance(index, int): return self._getitem_int(index) def __next__(self): v = self._parse_line(next(self._generator)) self._index += 1 return v def __iter__(self): self._rewind() for v in self._generator: yield self._parse_line(v) def __del__(self): self._fh.close()
[docs]class SysUtil(object): """ System utility class. Helper class to cache various things like the hostname, machine-id, amount of space in the filesystem. """ _ip_address = "0.0.0.0", 0 _external_ip = "0.0.0.0", 0 _machine_id = "", 0 _hostname = "HOSTNAME", 0 _tor_host = ("unknown.onion", "not a real key", "not a real client"), 0 _version = "Unknown spc-eyepi version", 0 a_statvfs = os.statvfs("/") _fs = (a_statvfs.f_frsize * a_statvfs.f_bavail, a_statvfs.f_frsize * a_statvfs.f_blocks), 0 _watches = list() thread = None stop = False logger = logging.getLogger("SysUtil") def __init__(self): if SysUtil.thread is None: SysUtil.thread = threading.Thread(target=self._thread) SysUtil.thread.start() pass
[docs] @staticmethod def write_global_config(data: dict, path_override=None): """ Writes a global configuration to the global_config.yml :param data: dict of data to write to the config """ path = "/home/spc-eyepi/{}.yml".format(SysUtil.get_hostname()) if path_override: path = path_override with open(path, 'r') as fh: current_config = yaml.load(fh.read()) if not type(current_config) is dict: current_config = {} current_config = recursive_update(current_config, data) yml = yaml.dump(current_config, default_flow_style=False) if SysUtil.get_checksum(path) != SysUtil.get_checksum_from_str(yml): with open(path, 'w') as fh: fh.write(yml)
[docs] @staticmethod def reset_usb_device(bus: int, dev: int) -> bool: """ resets a usb device. :param bus: bus number :type bus: int :param dev: device number of the device on the bus above :type dev: int """ try: fn = "/dev/bus/usb/{bus:03d}/{dev:03d}".format(bus=bus, dev=dev) with open(fn, 'w', os.O_WRONLY) as f: fcntl.ioctl(f, USBDEVFS_RESET, 0) return True except Exception as e: SysUtil.logger.error("Couldnt reset usb device (possible FileNotFound error): {}".format(str(e)))
[docs] @staticmethod def get_checksum(fp: str) -> str: """ gets the string checksum for a file, or returns random if the file doesnt exist :param fp: file path of the file. :return: crc32 checksum of the file at fp, or random letters """ checksum = "".join([random.choice(string.ascii_letters) for _ in range(8)]) if not os.path.exists(fp): return checksum with open(fp, 'rb') as f: checksum = "{:X}".format(crc32(f.read())) return checksum
[docs] @staticmethod def get_checksum_from_str(input_data) -> str: """ gets the string checksum for some bytes (or a string). coerces strings using utf-8 :param input_data: file path of the file. :return: crc32 checksum of the bytes """ if type(input_data) is str: input_data = bytes(input_data, 'utf-8') return "{:X}".format(crc32(input_data))
[docs] @staticmethod def reboot(): os.system("reboot")
[docs] @staticmethod def default_identifier(prefix=None): """ returns an identifier, If no prefix available, generates something. :param prefix: :return: string of the itentifier. :rtype: str """ if prefix: return SysUtil.get_identifier_from_name(prefix) else: from hashlib import md5 serialnumber = ("AUTO_" + md5(bytes(prefix, 'utf-8')).hexdigest()[len("AUTO_"):])[:32] SysUtil.logger.warning("using autogenerated serialnumber {}".format(serialnumber)) return serialnumber
@staticmethod def _nested_lookup(key, document): """ nested document lookup, works on dicts and lists :param key: string of key to lookup :param document: dict or list to lookup :return: yields item """ if isinstance(document, list): for d in document: for result in SysUtil._nested_lookup(key, d): yield result if isinstance(document, dict): for k, v in document.items(): if k == key: yield v elif isinstance(v, dict): for result in SysUtil._nested_lookup(key, v): yield result elif isinstance(v, list): for d in v: for result in SysUtil._nested_lookup(key, d): yield result
[docs] @staticmethod def sizeof_fmt(num, suffix='B')->str: """ formats a number of bytes in to a human readable string. returns in SI units eg sizeof_fmt(1234) returns '1.2KiB' :param num: number of bytes to format :param suffix: the suffix to use :return: human formattted string. :rtype: str """ for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix)
[docs] @classmethod def update_from_git(cls): """ updates spc-eyepi from git. """ os.system("git fetch --all;git reset --hard origin/master") os.system("systemctl restart spc-eyepi_capture.service")
[docs] @classmethod def get_hostname(cls)->str: """ gets the current hostname. if there is no /etc/hostname file, sets the hostname randomly. :return: the current hostname or the hostname it was set to :rtype: str """ if abs(cls._hostname[-1] - time.time()) > 10: if not os.path.isfile("/etc/hostname"): hostname = "".join(random.choice(string.ascii_letters) for _ in range(8)) os.system("hostname {}".format(cls._hostname)) else: with open("/etc/hostname", "r") as fn: hostname = fn.read().strip() cls._hostname = hostname, time.time() return cls._hostname[0]
[docs] @classmethod def set_hostname(cls, hostname: str): """ sets the machines hosname, in /etc/hosts and /etc/hostname :param hostname: the string of which to set the hostname to. """ try: with open(os.path.join("/etc/", "hostname"), 'w') as f: f.write(hostname + "\n") with open(os.path.join("/etc/", "hosts"), 'w') as hosts_file: h_tmpl = "127.0.0.1\tlocalhost.localdomain localhost {hostname}\n" h_tmpl += "::1\tlocalhost.localdomain localhost {hostname}\n" hosts_file.write(h_tmpl.format(hostname=hostname)) except Exception as e: cls.logger.error("Failed setting hostname for machine. {}".format(str(e)))
[docs] @classmethod def get_machineid(cls)->str: """ gets the machine id, or initialises the machine id if it doesnt exist. :return: string of the machine-id :rtype: str """ if abs(cls._machine_id[-1] - time.time()) > 10: if not os.path.isfile("/etc/machine-id"): os.system("systemd-machine-id-setup") with open("/etc/machine-id") as f: cls._machine_id = f.read().strip(), time.time() return cls._machine_id[0]
[docs] @classmethod def get_tor_host(cls)->tuple: """ gets a tuple of the current tor host. :return: tuple of hostname(onion address), client key, client name :rtype: tuple[str, str, str] """ if abs(cls._tor_host[-1] - time.time()) > 10: try: with open("/home/tor_private/hostname") as f: onion_address = f.read().replace('\n', '') cls._tor_host = onion_address.split(" ")[:3], time.time() except: cls._tor_host = ("unknown", 'unknown', "unknown"), time.time() return cls._tor_host[0]
[docs] @classmethod def get_fs_space(cls)->tuple: """ returns free/total space of root filesystem as bytes(?) :return: tuple of free/total space :rtype: tuple[int, int] """ if abs(cls._fs[-1] - time.time()) > 10: try: a_statvfs = os.statvfs("/") cls._fs = ( a_statvfs.f_frsize * a_statvfs.f_bavail, a_statvfs.f_frsize * a_statvfs.f_blocks), time.time() except: cls._fs = (0, 0), time.time() return cls._fs[0]
[docs] @classmethod def get_fs_space_mb(cls)->tuple: """ returns the filesystems free space in mebibytes. see :func:`get_fs_space` :return: tuple of free/total space :rtype:tuple[int, int] """ free_space, total_space = SysUtil.get_fs_space() for x in range(0, 2): free_space /= 1024.0 total_space /= 1024.0 return free_space, total_space
[docs] @classmethod def get_version(cls)->str: """ gets the "describe" version of the current git repo as a string. :return: the current version :rtype: str """ if abs(cls._version[-1] - time.time()) > 10: try: cmd = "/usr/bin/git describe --always" cls._version = subprocess.check_output([cmd], shell=True).decode().strip("\n"), time.time() except: cls._version = "unknown", time.time() return cls._version[0]
[docs] @classmethod def get_internal_ip(cls): """ gets the internal ip by attempting to connect to googles DNS :return: the current internal ip :rtype: str """ if abs(cls._ip_address[-1] - time.time()) > 10: try: try: import netifaces ip = netifaces.ifaddresses("tun0")[netifaces.AF_INET][0]["addr"] cls._ip_address = ip, time.time() except: import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 0)) cls._ip_address = s.getsockname()[0], time.time() except: cls._ip_address = "0.0.0.0", time.time() return cls._ip_address[0]
[docs] @classmethod def get_log_files(cls) -> list: """ returns the spc-eyepi log files that have been rotated. :return: list of filenames :rtype: list(str) """ return list(glob("/home/spc-eyepi/spc-eyepi.log.*"))
[docs] @classmethod def clear_files(cls, filenames: list): """ removes all files in the list provided, skipping and logging on an error removing todo: Do different things based on whether is a directory. :param filenames: list of directories or files :type filenames: list or tuple """ for f in filenames: try: os.remove(f) except FileNotFoundError as e: cls.logger.debug(str(e)) except IsADirectoryError as e: cls.logger.error(str(e)) except Exception as e: cls.logger.error(str(e))
[docs] @classmethod def get_isonow(cls): """ gets the current time as an iso8601 string :return: the current time as iso8601 :rtype: str """ return datetime.datetime.now().isoformat()
[docs] @classmethod def get_external_ip(cls): """ returns the external IP address of the raspberry pi through api.ipify.org :return: the external ip address :rtype: str """ if abs(cls._external_ip[-1] - time.time()) > 60: try: url = 'https://api.ipify.org/?format=json' response = request.urlopen(url, timeout=10).read().decode('utf-8') cls._external_ip = json.loads(response)['ip'], time.time() except: cls._external_ip = "0.0.0.0", time.time() return cls._external_ip[0]
[docs] @classmethod def get_identifier_from_name(cls, name): """ returns either the identifier (from name) or the name filled with the machine id clamps to 32 characters. :param name: name to fill :type name: str :return: filled name :rtype: str """ identifier = "".join((x if idx > len(name) - 1 else name[idx] for idx, x in enumerate(cls.get_machineid()))) return identifier[:32]
[docs] @classmethod def get_identifier_from_filename(cls, file_name): """ returns either the identifier (from the file name) or the name filled with the machine id :param file_name: filename :type file_name: str :return: string identifier, :rtype: str """ fsn = next(iter(os.path.splitext(os.path.basename(file_name))), "") return cls.get_identifier_from_name(fsn)
[docs] @classmethod def ensure_config(cls, identifier): """ ensures a configuration file exists for this identifier. if a config file doesnt exist then it will create a default one. :param identifier: identifier to create or find a configuration file for. :type identifier: str :return: the configuration file dict or configparser object. :rtype: dict or configparser.ConfigParser """ config = configparser.ConfigParser() config.read_string(default_config) path = cls.identifier_to_ini(identifier) try: if len(config.read(path)): return config except Exception as e: print(str(e)) if not config['localfiles']['spooling_dir']: config['localfiles']['spooling_dir'] = "/home/images/spool/{}".format(identifier) if not config['localfiles']['upload_dir']: config['localfiles']['upload_dir'] = "/home/images/upload/{}".format(identifier) if not config['camera']['name']: config['camera']['name'] = cls.get_hostname() + identifier[:6] cls.write_config(config, identifier) return config
[docs] @classmethod def write_config(cls, config: configparser.ConfigParser, identifier: str, prefix="configs_byserial"): """ writes a configuration file to an correct config file path. :param config: configuration file (configparser object) :type identifier: str :param identifier: identifier to user as the raget file name. :return: configparser object """ path = SysUtil.identifier_to_ini(identifier, prefix=prefix) with open(path, 'w+') as configfile: config.write(configfile) return config
[docs] @classmethod def identifier_to_ini(cls, identifier: str, prefix="configs_byserial")->str: """ gets a valid .ini path for an identifier. :param identifier: identifier to find an ini for. :return: file path for identifier :rtype: str """ for fn in glob("{prefix}/*.ini".format(prefix=prefix)): if identifier == cls.get_identifier_from_filename(fn): return fn else: return os.path.join("{prefix}/".format(prefix=prefix), identifier) + ".ini"
[docs] @classmethod def load_or_fix_solarcalc(cls, fp: str)-> LazySolarCalcReader: """ function to either load an existing fixed up solarcalc file or to coerce one into the fixed format. :param identifier: identifier of the light for which the solarcalc file exists. :type identifier: str :return: light timing data as a list of lists. :rtype: list(list()) """ lx = [] path, ext = os.path.splitext(fp) if ext not in (".csv", ".slc"): raise ValueError("Only .csv or .slc files are supported") if not os.path.isfile(fp): SysUtil.logger.error("no SolarCalc file.") raise FileNotFoundError() return LazySolarCalcReader(fp)
# headerstart = ['datetime', 'temp', 'relativehumidity'] # headerend = ['total_solar_watt', 'simulated_datetime'] # fill = "LED{}" # # if not os.path.isfile(fp): # SysUtil.logger.error("no SolarCalc file.") # raise FileNotFoundError() # if ext == ".slc": # with open(fp) as f: # lx = [x.strip().split(",") for x in f.readlines()] # for i,l in enumerate(lx): # lx[i][0] = parser.parse(lx[i][0]) # lx[i][-1] = parser.parse(lx[i][-1]) # else: # def get_lines(fh): # with open(fh) as f: # for line_str in f.readlines(): # try: # line = line_str.strip().split(",") # l = [ # parser.parse("{} {}".format(line[0], line[1])).isoformat(), # *line[2:-1], # parser.parse(line[-1]).isoformat() # ] # yield l # except Exception as e: # SysUtil.logger.error("Couldnt fix solarcalc file. {}".format(str(e))) # traceback.print_exc() # # with open(path + ".slc", 'w') as slc: # for line in get_lines(fp): # slc.write(",".join(line)+"\n") # lx.append(line) # # # led_fields = [fill.format(i) for i in range(len(lx[0])-len(headerstart)-len(headerend))] # # header = headerstart + led_fields + headerend # # lx.insert(0, header) # return lx[1:]
[docs] @classmethod def identifier_to_yml(cls, identifier: str)->str: """ the same as identifier_to_ini but for yml files :param identifier: identifier for a matching yml file. :type identifier: str :return: string filepath for the yml file. :rtype: str """ for fn in glob("configs_byserial/*.yml"): if identifier == cls.get_identifier_from_filename(fn): return fn else: return os.path.join("configs_byserial/", identifier) + ".yml"
[docs] @classmethod def configs_from_identifiers(cls, identifiers: set) -> dict: """ given a set of identifiers, returns a dictionary of the data contained in those config files with the key for each config file data being the identifier :param identifiers: :type identifiers: list(str) :return: dictionary of configuration datas :rtype: dict(str: dict) """ data = dict() for ini in ["configs_byserial/{}.ini".format(x) for x in identifiers]: cfg = configparser.ConfigParser() cfg.read(ini) d = dict() d = {section: dict(cfg.items(section)) for section in cfg.sections()} data[cls.get_identifier_from_filename(ini)] = d return data
[docs] @classmethod def add_watch(cls, path: str, callback): """ adds a watch that calls the callback on file change :param path: path of the file to watch :type path: str :param callback: function signature to call when the file is changed """ cls._watches.append((path, os.stat(path).st_mtime, callback))
[docs] @classmethod def open_yaml(cls, filename): """ opens a yaml file using yaml.load :param filename: yaml file to load :return: dictionary of values in yaml file :rtype: dict """ try: with open(filename) as e: q = yaml.load(e.read()) return q except Exception as e: print(str(e)) return dict()
@classmethod def _thread(cls): """ runs the watchers """ while True and not cls.stop: try: for index, (path, mtime, callback) in enumerate(cls._watches): tmt = os.stat(path).st_mtime if tmt != mtime: cls._watches[index] = (path, tmt, callback) try: print("calling {}".format(callback)) callback() except Exception as e: print(str(e)) time.sleep(1) except Exception as e: break cls.thread = None