import re
import traceback
import datetime
import operator
import logging.config
import time
from telnetlib import Telnet
import json
import requests
try:
logging.config.fileConfig("logging.ini")
logging.getLogger("paramiko").setLevel(logging.WARNING)
except:
pass
[docs]def clamp(v: float, minimum: float, maximum: float) -> float:
"""
clamps a number to the minimum and maximum.
:param v:
:param minimum:
:param maximum:
:return:
"""
return min(max(v, minimum), maximum)
[docs]class Controller(object):
"""
controller abstract that takes a dictionary config section and sets self attributes to it.
"""
def __init__(self, config_section: dict):
self.min = 0
self.max = 1000
self.logger = logging.getLogger(str(self.__class__))
self.get_wavelength_command = \
self.set_wavelength_command = \
self.set_all_command = \
self.set_all_wavelength_command = ""
for k, v in config_section.items():
setattr(self, k, v)
def _run_command(self, cmd):
"""
unimplemented,
override this to define how the controller should do things.
:return:
"""
return False
[docs] def set_all(self, power: int = None, percent: int = None):
"""
sets all wavelengths to either an absolute value or a percentage of the total
:param power: power to set all wavelengths to
:param percent: 0-100 percentage to set the lights to.
:return: true/false depending on whether the command was successful
"""
if not self.set_all_command:
self.logger.error("set_all call without set_all_command")
return None
if percent:
power = int(self.max * (percent / 100) + self.min)
cmd = None
if "{power}" in self.set_all_command:
cmd = self.set_all_command.format(power=power)
elif "{percent}" in self.set_all_command:
cmd = self.set_all_command.format(percent=percent)
else:
self.logger.error("set_wavelength no cmd")
return None
return self._run_command(cmd)
[docs] def set_wavelength(self, wl: str, power: int = None, percent: int = None):
"""
sets a specific wavelength to a value
either a power or a percent must be specified
:param wl: string of wavelength name (eg 400nm)
:param power: absolute power value
:param percent: percent value, calculated from min/max.
"""
if not self.set_wavelength_command:
self.logger.error("set_wavelength call without set_wavelength_command")
return None
if percent:
power = int(self.max * (percent / 100) + self.min)
cmd = None
if "{power}" in self.set_wavelength_command:
cmd = self.set_wavelength_command.format(wavelength=wl, power=power)
elif "{percent}" in self.set_wavelength_command:
cmd = self.set_wavelength_command.format(wavelength=wl, percent=percent)
else:
self.logger.error("set_wavelength no cmd")
return None
return self._run_command(cmd)
[docs] def set_all_wavelengths(self, values: dict, percent=True):
"""
sets all wavelengths to specific values.
only absolute values may be specified
values should be specified as a dict of wavelength: value pairs
:param values: dict of wavelengths and their respective values
:param percent: whether the values are expressed as 0-100 or absolute.
"""
if not self.set_all_wavelength_command:
self.logger.error("set_all_wavelengths call without set_all_wavelength_command")
return None
r = re.compile(r'\d+')
def keygetter(it):
vs = r.search(it[0])
return 0 if not vs else int(vs.group())
sorted_values = sorted(values.items(), key=keygetter)
if percent:
sorted_values = [(k, int(self.max * (v / 100) + self.min)) for k, v in sorted_values]
if len(values) < self.set_all_wavelength_command.count("{}"):
self.logger.error("Not enough wavelengths specified for set_all_wavelengths, padding with 0s")
diff = self.set_all_wavelength_command.count("{}") - len(values)
sorted_values.extend(("padded", 0) for _ in range(diff))
sorted_values = [(k, clamp(v, self.min, self.max)) for k, v in sorted_values]
cmd = self.set_all_wavelength_command.format(*[v for k, v in sorted_values])
if self._run_command(cmd):
return dict([(str(k).lower(), int(v)) for k, v in sorted_values])
return {}
[docs] def get_wavelength(self, wavelength: str):
"""
gets the power of a specific wavelength
:param wavelength: wavelength string to get the power of (eg 400nm)
:return:
"""
if not self.get_wavelength_command:
self.logger.error("get_wavelength call without get_wavelength_command")
return None
cmd = None
if "{wavelength}" in self.get_wavelength_command:
cmd = self.get_wavelength_command.format(wavelength=wavelength)
return self._run_command(cmd)
[docs]class TelNetController(Controller):
"""
controller for a Light.
"""
def __init__(self, config_section):
self.ip = \
self.telnet_port = ""
super(TelNetController, self).__init__(config_section)
def _run_command(self, cmd: str, ok="OK") -> bool:
"""
sends a telnet command to the host
:param cmd:
:return: bool successful
"""
telnet = Telnet(self.ip, self.telnet_port, 60)
try:
response = telnet.read_until(b'>', timeout=0.1)
self.logger.debug("Intial response is: {0!s}".format(response.decode()))
# we MUST wait a little bit before writing to ensure that the stream isnt being written to.
time.sleep(0.5)
# encode to ascii and add LF. unfortunately this is not to the telnet spec (it specifies CR LF or LF CR I'm ns)
telnet.write(cmd.encode("ascii") + b"\n")
ok_regex = re.compile(b'.*'+ok.encode("ascii")+b'.*')
response = telnet.expect([ok_regex], timeout=30)
if response[0] < 0:
return False
else:
return True
except:
self.logger.error(traceback.format_exc())
return False
finally:
telnet.close()
[docs]class HTTPController(Controller):
def __init__(self, config_section):
self.ip = self.control_uri = ""
super(HTTPController, self).__init__(config_section)
if not self.ip.startswith("http://"):
self.ip = "http://" + self.ip
if not self.control_uri.startswith("/"):
self.control_uri = "/" + self.control_uri
def _run_command(self, cmd):
payload = json.loads("{" + cmd + "}")
response = requests.post(self.ip + self.control_uri, data=payload, timeout=10)
if response.status_code == 200:
return True
else:
return False
[docs] def kill_schedule(self):
"""
turns off all the heliospectras internal scheduling sysm
:return:
"""
# this is the payload, every timepoint (represented by "A##") is set to 0 or off
payload = {
'A00': 0,
'A01': 0,
'A02': 0,
'A03': 0,
'A04': 0,
'A05': 0,
'A06': 0,
'A07': 0,
'A08': 0,
'A09': 0,
'A10': 0,
'A11': 0,
'A12': 0,
'A13': 0,
'Submit': 'Set schedule'
}
try:
response = requests.post(self.ip + "/cgi-bin/sched.cgi", data=payload, timeout=10)
if response.status_code == 200:
return True
else:
return False
except:
return False
[docs]class HelioSpectra(object):
"""
Dumb runnner for a light, must be controlled by a chamber.
automatically scales the power from 0-100 to the provided min-max (0-1000) by default
"""
accuracy = 3
# s10 wls
s10wls = ["400nm", "420nm", "450nm", "530nm", "630nm", "660nm", "735nm"]
# these are the s20 wls.
s20wls = ["370nm", "400nm", "420nm", "450nm", "530nm", "620nm", "660nm", "735nm", "850nm", "6500k"]
def __init__(self, config):
self.name = config.get("name")
self.logger = logging.getLogger(self.name)
self.logger.info("init...")
self.config = config.copy()
self.failed = list()
self.percent = config.get("percent", True)
telnet_config = self.config.get('telnet', {})
telnet_config['ip'] = self.config.get('ip')
telnet_config['max'] = self.config.get('max_power', 1000)
telnet_config['min'] = self.config.get('min_power', 0)
self.controller = TelNetController(telnet_config)
http_config = config.get("http", {})
http_config['ip'] = self.config.get('ip')
http_config['max'] = self.config.get('max_power', 1000)
http_config['min'] = self.config.get('min_power', 0)
self.logger.info("Killing the schedule")
HTTPController(http_config).kill_schedule()
# self.wavelengths = self.config.get("wavelengths",
# fallback=["400nm", "420nm", "450nm", "530nm", "630nm", "660nm", "735nm"])
self.wavelengths = self.config.get("wavelengths", self.s20wls)
[docs] def set(self, intensities: list) -> dict:
"""
sets the lights wavelengths to the values in the list.
returns a dict of the wavelengths set and their values.
If the length of the list of intensities doesnt match the number of custom wavelengths, but does match the
length of the list of s10 or s20 wavelengths, then they will be used.
If none of those conditions are met, returns an empty dict.
:param intensities: intensities to set to
:return:
"""
intensities = list(map(float, intensities))
values = dict(zip(self.wavelengths, intensities))
if len(intensities) != len(self.wavelengths):
if len(intensities) == len(HelioSpectra.s10wls):
values = dict(zip(HelioSpectra.s10wls, intensities))
elif len(intensities) == len(HelioSpectra.s20wls):
values = dict(zip(HelioSpectra.s20wls, intensities))
else:
print("light values do not match length, {}".format(str(intensities)))
return {}
print("Setting light values: {}".format(str(values)))
return self.controller.set_all_wavelengths(values, percent=True)