import numpy
import time
import logging.config
from collections import deque
from threading import Thread
import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from xml.etree import ElementTree
try:
logging.config.fileConfig("logging.ini")
logging.getLogger("paramiko").setLevel(logging.WARNING)
except:
# it wont import if the logging file isnt present.
pass
[docs]class PanTilt(object):
"""
Control J-Systems PTZ
For new system or new firmware, the system needs calibration as follows:
- Open URL of the PTZ on a web browser
- Click on "Calibration" tab, enter username and password if necessary
- On Calibration window, click on "Open-loop" and then "Set Mode"
- Use joystick controller to rotate the pan axis to minimum position
- Click on 'Pan Axis Min' line, enter '2.0', and click "Set Calibration"
- Use joystick controller to rotate the pan axis to maximum position
- Click on 'Pan Axis Max' line, enter '358.0', and click "Set Calibration"
- Use joystick controller to rotate the tilt axis to minimum position
- Click on 'Tilt Axis Min' line, enter '-90.0', and click "Set Calibration"
- Use joystick controller to rotate the tilt axis to maximum position
- Click on 'Tilt Axis Max' line, enter '30.0', and click "Set Calibration"
- Click on "Closed-loop" and then "Set Mode"
- Close Calibration window
"""
def __init__(self, ip=None, user=None, password=None, config=None, queue=None):
self.communication_queue = deque(tuple(), 256) if queue is None else queue
self.logger = logging.getLogger("PanTilt")
if not config:
config = dict()
config = config.copy()
self.command_urls = config.get('urls', {})
self.return_keys = config.get('keys', {})
self._notified = []
self.return_parser = config.get("return_parser", "plaintext")
format_str = config.get("format_url", "http://{HTTP_login}@{ip}{command}")
self.auth_type = config.get("auth_type", "basic")
self.auth_object = None
if format_str.startswith("http://{HTTP_login}@"):
format_str = format_str.replace("{HTTP_login}@", "")
self.auth_object = HTTPBasicAuth(user or config.get("username", "admin"),
password or config.get("password", "admin"))
self.auth_object_digest = HTTPDigestAuth(config.get("username", "admin"),
config.get("password", "admin"))
self.auth_object = self.auth_object_digest if self.auth_type == "digest" else self.auth_object
self._HTTP_login = config.get("HTTP_login", "{user}:{password}").format(
user=user or config.get("username", "admin"),
password=password or config.get("password", "admin"))
self._url = format_str.format(
ip=ip or config.get("ip", "192.168.1.101:81"),
HTTP_login=self._HTTP_login,
command="{command}")
self._pan_tilt_scale = config.get("scale", 10.0)
self._pan_range = list(config.get("pan_range", [0, 360]))
self._tilt_range = list(config.get("tilt_range", [-90, 30]))
self._position = [0, 0]
self._pan_range.sort()
self._tilt_range.sort()
self._zoom_position = config.get("zoom", 800)
self._zoom_range = config.get("zoom_range", [30, 1000])
self.zoom_position = self._zoom_position
# set zoom position to fill hfov and vfov
# need to set this on camera.
# self._hfov = numpy.interp(self._zoom_position, self.zoom_list, self.hfov_list)
# self._vfov = numpy.interp(self._zoom_position, self.zoom_list, self.vfov_list)
self._accuracy = config.get("accuracy", 0.5)
self._rounding = len(str(float(self._accuracy)).split(".")[-1].replace("0", ""))
time.sleep(0.2)
self.logger.info("pantilt:".format(self.position))
[docs] def communicate_with_updater(self):
"""
communication member. This is meant to send some metadata to the updater thread.
:return:
"""
try:
data = dict(
name=self.camera_name,
identifier=self.identifier,
failed=self.failed,
last_capture=int(self.current_capture_time.strftime("%s")))
self.communication_queue.append(data)
self.failed = list()
except Exception as e:
self.logger.error("thread communication error: {}".format(str(e)))
def _make_request(self, command_string, *args, **kwargs):
"""
makes a generic request formatting the command string and applying the authentication.
:param command_string:
:param args:
:param kwargs:
:return:
"""
url = self._url.format(*args, command=command_string, **kwargs)
if "&" in url and "?" not in url:
url = url.replace("&", "?", 1)
response = None
try:
response = requests.get(url, auth=self.auth_object)
if response.status_code == 401 and self.auth_type != "digest":
self.logger.debug("Auth is not basic, trying digest")
response = requests.get(url, auth=self.auth_object_digest)
self.logger.debug(response.text)
except Exception as e:
self.logger.error("Some exception got raised {}".format(str(e)))
return
if response.status_code not in [200, 204]:
self.logger.error("[{}] - {}\n{}".format(str(response.status_code), str(response.reason), str(response.url)))
return
return response
def _read_stream(self, command_string, *args, **kwargs):
"""
opens a url with the current HTTP_login string
:type command_string: str
:param command_string: url to go to with parameters
:return: string of data returned from the camera
"""
response = self._make_request(command_string, *args, **kwargs)
if response is None:
return
return response.text
def _read_stream_raw(self, command_string, *args, **kwargs):
"""
opens a url with the current HTTP_login string
:type command_string: str
:param command_string: url to go to with parameters
:return: string of data returned from the camera
"""
response = self._make_request(command_string, *args, **kwargs)
if response is None:
return
return response.content
def _get_cmd(self, cmd):
cmd_str = self.command_urls.get(cmd, None)
if not cmd_str and cmd_str not in self._notified:
self.logger.error("No command available for \"{}\"".format(cmd))
self._notified.append(cmd_str)
return None, None
keys = self.return_keys.get(cmd, [])
if type(keys) not in (list, tuple):
keys = [keys]
return cmd_str, keys
@staticmethod
[docs] def get_value_from_xml(message_xml, *args):
"""
gets float, int or string values from a xml string where the key is the tag of the first element with value as
text.
:param message_xml: the xml to searach in.
:param args: list of keys to find values for.
:rtype: dict
:return: dict of arg: value pairs requested
"""
return_values = dict()
if not len(args):
return return_values
if not len(message_xml):
return return_values
# apparently, there is an issue parsing when the ptz returns INVALID XML (WTF?)
# these seem to be the tags that get mutilated.
illegal = ['\n', '\t', '\r',
"<CPStatusMsg>", "</CPStatusMsg>", "<Text>",
"</Text>", "<Type>Info</Type>", "<Type>Info",
"Info</Type>", "</Type>", "<Type>"]
for ill in illegal:
message_xml = message_xml.replace(ill, "")
root_element = ElementTree.Element("invalidation_tag")
try:
root_element = ElementTree.fromstring(message_xml)
except Exception as e:
print(str(e))
print("Couldnt parse XML!!!")
print(message_xml)
return_values = dict()
for key in args:
target_ele = root_element.find(key)
if target_ele is None:
continue
value = target_ele.text.replace(' ', '')
if value is None:
continue
types = [float, int, str]
for t in types:
try:
return_values[key] = t(value)
break
except ValueError:
pass
else:
print("Couldnt cast an xml element text attribute to str. What are you feeding the xml parser?")
return return_values
@staticmethod
[docs] def get_value_from_plaintext(message, *args):
"""
gets float, int or string values from a xml string where the key is the tag of the first element with value as
text.
:param message:
:param args: list of keys to find values for.
:rtype: dict
:return: dict of arg: value pairs requested
"""
return_values = dict()
if not len(args):
return return_values
if not len(message):
return return_values
for line in message.split("\n"):
line = line.replace("= ","=").replace(" =","=").strip()
name, value = line.partition("=")[::2]
name, value = name.strip(), value.strip()
types = [float, int, str]
if name in args:
for t in types:
try:
v = t(value)
if str(v).lower() in ['yes', 'no', 'true', 'false', 'on', 'off']:
v = str(v).lower() in ['yes', 'true', 'on']
return_values[name] = v
break
except ValueError:
pass
else:
print("Couldnt cast an plaintext element text attribute to str. What are you feeding the parser?")
return return_values
[docs] def get_value_from_stream(self, stream, *keys):
if stream is None: return
if len(keys) is 0: return
if self.return_parser == 'plaintext':
return self.get_value_from_plaintext(stream, *keys)
elif self.return_parser == 'xml':
return self.get_value_from_xml(stream, *keys)
else:
return None
[docs] def pan_step(self, direction, n_steps):
"""
pans by step, steps must be less than or equal to 127
:type n_steps: int
:type direction: str
:param direction:
:param n_steps: integer <= 127. number of steps
:return:
"""
assert (abs(n_steps) <= 127)
cmd, key = self._get_cmd("pan_step")
if not cmd:
return
amt = -n_steps if direction.lower() == "left" else n_steps
stream = self._read_stream(cmd.format(pan=amt))
return self.get_value_from_stream(stream, *key)
[docs] def tilt_step(self, direction, n_steps):
"""
tilts by step, steps must be less than or equal to 127
:type n_steps: int
:type direction: str
:param direction:
:param n_steps: integer <= 127. number of steps
:return:
"""
assert (abs(n_steps) <= 127)
amt = -n_steps if direction.lower() == "down" else n_steps
cmd, key = self._get_cmd("tilt_step")
if not cmd:
return
stream = self._read_stream(cmd.format(tilt=amt))
return self.get_value_from_stream(stream, *key)
@property
def zoom_position(self):
"""
Zoom Position.
:getter: from camera.
:setter: to camera.
:rtype: tuple
"""
cmd, keys = self._get_cmd("get_zoom")
if cmd:
try:
stream_output = self._read_stream(cmd)
self._zoom_position = self.get_value_from_stream(stream_output, keys) or self._zoom_position
except:
pass
return self._zoom_position
@zoom_position.setter
def zoom_position(self, absolute_value):
cmd, keys = self._get_cmd("set_zoom")
if cmd:
assert (self._zoom_range is not None and absolute_value is not None)
assert type(absolute_value) in (float, int)
absolute_value = min(self._zoom_range[1], max(self._zoom_range[0], absolute_value))
try:
stream_output = self._read_stream(cmd.format(zoom=absolute_value))
value = self.get_value_from_stream(stream_output, *keys)
if value:
self._zoom_position = value
except:
pass
else:
self._zoom_position = absolute_value
@property
def zoom_range(self):
"""
Range of zoom for the camera.
:getter: from camera.
:setter: cached.
:rtype: tuple
"""
cmd,key = self._get_cmd("get_zoom_range")
if not cmd:
return self._zoom_range
stream_output = self._read_stream(cmd)
self._zoom_range = self.get_value_from_stream(stream_output, *key) or self._zoom_range
return self._zoom_range
@zoom_range.setter
def zoom_range(self, value):
assert type(value) in (list, tuple), "must be either list or tuple"
assert len(value) == 2, "must be 2 values"
self._zoom_range = list(value)
@property
def zoom_list(self) -> list:
"""
List of zoom value intervals.
Setting this also affects the state of other related variables.
:getter: cached.
:setter: recalculates, recalculates fov lists, resets zoom_position.
:rtype: list
"""
return self._zoom_list
@zoom_list.setter
def zoom_list(self, value):
assert type(value) in (list, tuple), "Must be a list or tuple"
assert len(value) > 1, "Must have more than one element"
self._zoom_list = list(value)
it = iter(self._hfov_list)
self._hfov_list = [next(it, self._hfov_list[-1]) for _ in self._zoom_list]
it = iter(self._vfov_list)
self._vfov_list = [next(it, self._vfov_list[-1]) for _ in self._zoom_list]
self.zoom_position = self._zoom_position
@property
def position(self):
"""
gets the current pan/tilt position.
:return: tuple (pan, tilt)
"""
cmd, keys = self._get_cmd("get_pan_tilt")
if not cmd:
return
p = [None]
try:
output = self._read_stream(cmd)
values = self.get_value_from_stream(output, *keys)
p = tuple(values.get(k, None) for k in keys)
except Exception as e:
self.logger.error("ERROR: {}".format(str(e)))
if not any(p):
return self._position
else:
self._position = p
return self._position
def _get_pos(self):
"""
slightly faster and less robust method of getting the position.
:return:
"""
cmd, keys = self._get_cmd("get_pan_tilt")
if cmd is None:
return
output = self._read_stream(cmd)
values = self.get_value_from_stream(output, *keys)
p = tuple(values.get(k, None) for k in keys)
if not any(p):
return None
return p
@position.setter
def position(self, position=(None, None)):
"""
Sets the absolute pan/tilt position in degrees.
float degree values are floored to int.
:type position: tuple
:param position: absolute degree value for pan,tilt as (pan,tilt)
:return:
"""
pan_degrees, tilt_degrees = position
start_pos = self._get_pos()
if not start_pos:
return
cmd, keys = self._get_cmd("set_pan_tilt")
if not cmd:
return
if pan_degrees is None:
pan_degrees = start_pos[0]
if tilt_degrees is None:
tilt_degrees = start_pos[1]
pan_degrees, tilt_degrees = round(pan_degrees, 1), round(tilt_degrees, 1)
pd = min(self._pan_range[1], max(self._pan_range[0], pan_degrees))
td = min(self._tilt_range[1], max(self._tilt_range[0], tilt_degrees))
diff = abs(self._position[0] - pd) + abs(self._position[1] - td)
if diff <= self._accuracy:
return
if td != tilt_degrees or pd != pan_degrees:
self.logger.error("hit pantilt limit")
self.logger.error("{} [{}] {} ....... {} [{}] {}".format(
self._pan_range[0], pan_degrees, self._pan_range[1],
self._tilt_range[0], tilt_degrees, self._tilt_range[1]))
pan_degrees, tilt_degrees = pd, td
cmd = cmd.format(pan=pan_degrees * self._pan_tilt_scale,
tilt=tilt_degrees * self._pan_tilt_scale)
for x in range(120):
try:
text = self._read_stream(cmd)
if not text: # this breaks the next part because some ptzs return no-content on change.
break
etr = ElementTree.fromstring(text)
ele = (etr.findall(".//Type") or [None])[0]
if getattr(ele, "text", None) == "Info":
print("")
break
if x == 0:
print("Waiting on ptz.", end="")
else:
print(".", end="")
time.sleep(0.1)
except Exception as e:
self.logger.error("ERROR: {}".format(str(e)))
time.sleep(1)
else:
self.logger.error("Couldn't set the pantilt position.")
self.logger.error(self._read_stream(cmd))
# loop until within 1 degree
pan_pos, tilt_pos = None, None
for _ in range(120):
time.sleep(0.05)
p = self._get_pos()
if not p:
continue
pan_pos, tilt_pos = p
pan_diff = abs(pan_pos - pan_degrees)
tilt_diff = abs(tilt_pos - tilt_degrees)
if pan_diff <= self._accuracy and tilt_diff <= self._accuracy:
break
else:
self.logger.warning("Warning: pan-tilt fails to move to correct location")
self.logger.warning(" Desired: pan_pos={}, tilt_pos={}".format(pan_degrees, tilt_degrees))
self.logger.warning(" Current: pan_pos={}, tilt_pos={}".format(pan_pos, tilt_pos))
# loop until smallest distance is reached
for _ in range(0, 100):
time.sleep(0.05)
p = self._get_pos()
if not p:
continue
pan_pos, tilt_pos = p
pan_diff_new = abs(pan_pos - pan_degrees)
tilt_diff_new = abs(tilt_pos - tilt_degrees)
if pan_diff_new >= pan_diff or tilt_diff_new >= tilt_diff:
break
else:
pan_diff = pan_diff_new
tilt_diff = tilt_diff_new
pn = self._position
self._position = self.position
# print("moved {}° | {}°".format(round(pd-pn[0], self._rounding), round(td-pn[1], self._rounding)))
@property
def scale(self):
return self._pan_tilt_scale
@scale.setter
def scale(self, value):
self._pan_tilt_scale = value
@property
def pan(self):
return self.position[0]
@pan.setter
def pan(self, value):
self.position = (value, None)
@property
def pan_range(self):
return self._pan_range
@pan_range.setter
def pan_range(self, value):
assert type(value) in (list, tuple), "must be a list or tuple"
assert len(value) == 2, "must have 2 elements"
self._pan_range = sorted(list(value))
@property
def tilt(self):
return self.position[1]
@tilt.setter
def tilt(self, value):
self.position = (None, value)
@property
def tilt_range(self):
return self._tilt_range
@tilt_range.setter
def tilt_range(self, value):
assert type(value) in (list, tuple), "must be a list or tuple"
assert len(value) == 2, "must have 2 elements"
self._tilt_range = sorted(list(value))
[docs] def hold_pan_tilt(self, state):
"""
unknown, presumably holds the pan-tilt in one place.
doesnt work...
:param state: ? beats me.
:return:
"""
cmd_str = "/Calibration.xml?Action=0" if state is True else "/Calibration.xml?Action=C"
output = self._read_stream(cmd_str)
# apparently this was left here?
print(output)
return self.get_value_from_stream(output, "Text")
@property
def PCCWLS(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "PCCWLS")
@property
def PCWLS(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "PCWLS")
@property
def TDnLS(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "TDnLS")
@property
def TUpLS(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "TUpLS")
@property
def battery_voltage(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "BattV")
@property
def heater(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "Heater")
@property
def temp_f(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "Temp")
@property
def list_state(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "ListState")
@property
def list_index(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "ListIndex")
@property
def control_mode(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "CtrlMode")
@property
def auto_patrol(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "AutoPatrol")
@property
def dwell(self):
output = self._read_stream("/CP_Update.xml")
return self.get_value_from_stream(output, "Dwell")
[docs]class ThreadedPTZ(Thread):
def __init__(self, *args, **kwargs):
if hasattr(self, "identifier"):
Thread.__init__(self, name=self.identifier)
else:
Thread.__init__(self)
print("Threaded startup")
super(ThreadedPTZ, self).__init__(*args, **kwargs)
self.daemon = True
[docs]class ThreadedPanTilt(ThreadedPTZ, PanTilt):
def __init__(self, *args, **kwargs):
self.identifier = "J-Systems PanTilt"
PanTilt.__init__(self, *args, **kwargs)
super(ThreadedPanTilt, self).__init__(*args, **kwargs)
[docs] def run(self):
super(PanTilt, self).run()