Source code for libs.Sensor

import datetime
import logging.config
import os
import time
from collections import deque
import shutil
from threading import Thread, Event
from libs.SysUtil import SysUtil
import csv, json
import traceback

try:
    logging.config.fileConfig("logging.ini")
    logging.getLogger("paramiko").setLevel(logging.WARNING)
except:
    pass

try:
    from sense_hat import SenseHat
except Exception as e:
    logging.warning("Couldnt import sensehat: {}".format(str(e)))

try:
    import Adafruit_DHT
except Exception as e:
    logging.warning("Couldnt import Adafruit_DHT: {}".format(str(e)))

try:
    import telegraf
except Exception as e:
    logging.error("Couldnt import Telegraf, not sending metrics: {}".format(str(e)))


[docs]def round_to_1dp(n): return round(n, 1)
[docs]class Sensor(Thread): """ Sensor base. To use this class you need to override 'get_measurement()' so that it returns a tuple of the measurements that match the headers defined in the data_headers classvar. by default it will write 5 files, rolling 24 hour files (csv, tsv & json) and all time files that are appended to (csv & tsv only) """ accuracy = 1 data_headers = tuple() timestamp_format = "%Y-%m-%dT%H:%M:%S" def __init__(self, identifier: str, config: dict = None, queue: deque = None, write_out: bool = True, interval: int = 60, **kwargs): # identifier is NOT OPTIONAL! super().__init__(name=identifier) print("Thread started {}: {}".format(self.__class__, identifier)) # data headers need to be set if queue is None: queue = deque(tuple(), 256) self.communication_queue = queue self.logger = logging.getLogger(identifier) self.stopper = Event() self.identifier = identifier if config: interval = config.get("interval", interval) # interval in seconds self.interval = interval # chunking interval in number of datapoints dlen = int(86400 / interval) # setup a deque of measurements self.measurements = deque(maxlen=dlen) self.write_out = write_out out_dir = os.path.join(os.getcwd(), "sensors", self.identifier) self.output_dir = config.get("output_dir", out_dir) if write_out: if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) self.current_capture_time = datetime.datetime.now() self.failed = list()
[docs] @staticmethod def timestamp(tn: datetime.datetime) -> str: """ creates a properly formatted timestamp. :param tn: datetime to format to timestream timestamp string :return: """ return tn.strftime('%Y_%m_%d_%H_%M_%S')
[docs] @staticmethod def time2seconds(t: datetime) -> int: """ converts a datetime to an integer of seconds since epoch """ try: return int(t.timestamp()) except: # only implemented in python3.3 # this is an old compatibility thing return t.hour * 60 * 60 + t.minute * 60 + t.second
@property def timestamped_filename(self) -> str: """ builds a timestamped image basename without extension from a datetime. :param time_now: :return: string image basename """ return '{sensor_name}_{timestamp}'.format(sensor_name=self.identifier, timestamp=Sensor.timestamp(self.current_capture_time)) @property def time_to_measure(self) -> bool: """ filters out times for mesauring, returns True by default returns False if the conditions where the sensor should NOT capture are met. :return: """ # data capture interval if not (self.time2seconds(self.current_capture_time) % self.interval < Sensor.accuracy): return False return True
[docs] def stop(self): """ stops the thread. :return: """ self.stopper.set()
[docs] def communicate_with_updater(self): """ communication member. This is meant to send some metadata to the updater thread. :return: """ try: data = dict( name=self.identifier, last_measure=self.current_capture_time.isoformat(), identifier=self.identifier, failed=self.failed ) self.communication_queue.append(data) self.failed = list() except Exception as e: self.logger.error("thread communication error: {}".format(str(e)))
[docs] def write_daily_rolling(self): """ writes full rolling daily daita files. :param rows: :return: """ try: fn = os.path.join(self.output_dir, "{}-daily".format(self.identifier)) csvf, tsvf, jsonf = fn + ".csv", fn + ".tsv", fn + ".json" with open(csvf, 'w', newline='') as csvfile, open(tsvf, 'w', newline='') as tsvfile, open(jsonf, 'w', newline='') as jsonfile: writer = csv.writer(csvfile, dialect=csv.excel) writer.writerow(("datetime", *self.data_headers)) writer.writerows(self.measurements) writer = csv.writer(tsvfile, dialect=csv.excel_tab) writer.writerow(("datetime", *self.data_headers)) writer.writerows(self.measurements) d = dict() for k in self.data_headers: d[k] = list() d['datetime'] = [] for measurement in self.measurements: for idx, m in enumerate(measurement[:len(d.keys())]): header = "datetime" if idx != 0: header = self.data_headers[idx - 1] d[header].append(m) jsonfile.write(json.dumps(d)) except Exception as e: self.logger.error("Error writing daily rolling data {}".format(str(e)))
[docs] def append_to_alltime(self, measurement: tuple): """ appends the measurement to the csv and tsv files. :param measurement: :return: """ try: fn = os.path.join(self.output_dir, "{}-lastday".format(self.identifier)) fn2 = os.path.join(self.output_dir, "{}-alltime".format(self.identifier)) csvf, tsvf = fn + ".csv", fn + ".tsv" csvf2, tsvf2 = fn2 + ".csv", fn2 + ".tsv" self.rotate(csvf, tsvf) def create_with_headers(path, delimiter=","): # write the headers if the files are new. if not os.path.exists(path): with open(path, 'w') as f: f.write(delimiter.join(("datetime", *self.data_headers)) + "\n") create_with_headers(csvf) create_with_headers(csvf2) create_with_headers(tsvf, delimiter='\t') create_with_headers(tsvf2, delimiter='\t') def append_measurement(fn, delimiter=","): with open(fn, 'a') as f: f.write(delimiter.join(str(x) for x in measurement) + "\n") append_measurement(csvf) append_measurement(csvf2) append_measurement(tsvf, delimiter='\t') append_measurement(tsvf2, delimiter='\t') except Exception as e: self.logger.error("Error appending measurement to the all time data: {}".format(str(e)))
[docs] def rotate(self, csvf, tsvf): def last_line(f): f.seek(-1024, 2) return f.readlines()[-1].decode() rotatecsv, rotatetsv = False, False try: with open(csvf, 'rb') as f: lastd = datetime.datetime.strptime(last_line(f).split(",")[0]) if lastd.day != datetime.date.today().day: rotatecsv = True if rotatecsv: shutil.move(csvf, csvf.replace("lastday", lastd.strftime(self.timestamp_format))) except: # cannot parse datetime because the last line is the header or file doesnt exist pass try: with open(tsvf, 'rb') as f: lastd = datetime.datetime.strptime(last_line(f).split("\t")[0]) if lastd.day != datetime.date.today().day: rotatetsv = True if rotatetsv: shutil.move(tsvf, tsvf.replace("lastday", lastd.strftime(self.timestamp_format))) except: # cannot parse datetime because the last line is the header or file doesnt exist pass
[docs] def run(self): """ run method. used for threaded sensors :return: """ while True and not self.stopper.is_set(): self.current_capture_time = datetime.datetime.now() # checking if enabled and other stuff if self.time_to_measure: try: self.logger.info("Capturing data for {}".format(self.identifier)) measurement = self.get_measurement() self.logger.info("Got Measurement {}".format(str(measurement))) try: telegraf_client = telegraf.TelegrafClient(host="localhost", port=8092) telegraf_client.metric("env_sensors", measurement) self.logger.debug("Communicated sensor data to telegraf") except Exception as exc: self.logger.error("Couldnt communicate with telegraf client. {}".format(str(exc))) # make ordered list of the data for writing. to disk. m = [measurement[k] for k in self.data_headers] self.measurements.append([self.current_capture_time.strftime(self.timestamp_format), *measurement]) self.append_to_alltime(self.measurements[-1]) self.write_daily_rolling() # self.communicate_with_updater() except Exception as e: self.logger.critical("Sensor data error - {}".format(str(e))) # make sure we cannot record twice. time.sleep(Sensor.accuracy * 2) time.sleep(0.1)
[docs] def get_measurement(self) -> dict: """ override this method with the method of collecting measurements from the sensor should return a dict :return: dict of measurements and their names :rtype: dict """ return dict()
""" TODO: make conviron "sensor" to do the monitoring in a more regular . """
[docs]class DHTMonitor(Sensor): """ Data logger class for DHT11, DHT22 & AM2302 GPIO temperature & humidity sensors from Adafruit. supply the identifier and the gpio pi that the sensor is connected to, along with the type of sensor. defaults to pin 14, DHT22 """ data_headers = ('humidity', "temperature") def __init__(self, identifier, pin: int = 14, sensor_type="AM2302", **kwargs): self.pin = pin sensor_args = { 11: Adafruit_DHT.DHT11, 22: Adafruit_DHT.DHT22, 2302: Adafruit_DHT.AM2302, "11": Adafruit_DHT.DHT11, "22": Adafruit_DHT.DHT22, "2302": Adafruit_DHT.AM2302, "DHT11": Adafruit_DHT.DHT11, "DHT22": Adafruit_DHT.DHT22, "AM2302": Adafruit_DHT.AM2302, } self.sensor_type = sensor_args.get(sensor_type, Adafruit_DHT.AM2302) super().__init__(identifier, **kwargs)
[docs] def get_measurement(self) -> dict: """ gets data from the DHT22 """ try: measurement = Adafruit_DHT.read_retry(self.sensor_type, self.pin) return {key: round_to_1dp(value) for key, value in zip(self.data_headers, measurement)} except Exception as e: self.logger.error("Couldnt get data, {}".format(str(e))) return {_: None for _ in self.data_headers}
from .Chamber import ConvironTelNetController
[docs]class ConvironChamberSensor(Sensor): data_headers = ("temp_set", "humidity_set", "temp_recorded", "humidity_recorded", "par") def __init__(self, identifier, config, *args, **kwargs): self.controller = ConvironTelNetController(config['telnet']) self.temperature_multiplier = config.get("temperature_multiplier", 10.0) super().__init__(identifier, **kwargs)
[docs] def get_measurement(self): measurement = dict() for _ in range(10): try: measurement.update(self.controller.get_values()) # collect chamber sensor metrics if type(measurement.get("temp_recorded")) is float: measurement['temp_recorded'] /= self.temperature_multiplier if type(measurement.get("temp_set")) is float: measurement['temp_set'] /= self.temperature_multiplier return measurement except Exception as e: traceback.print_exc() self.logger.warning("Couldnt collect chamber sensor metric retrying: {}".format(str(e))) print("Failed, retrying ({}/10)".format(_)) else: print("Totally failed getting chamber metrics") self.logger.error("Totally failed getting chamber metrics") return measurement
[docs]class SenseHatMonitor(Sensor): """ Data logger class for Astro Pi Sensehat No need to supply anything except the identifier as the SenseHad uses some kind of black sorcery to work it out. """ data_headers = ("temperature", "humidity", "pressure") def __init__(self, identifier: str = None, *args, **kwargs): self.sensehat = SenseHat() self.display_str = "Init Sensors..." self.sensehat.show_message(self.display_str) super().__init__(identifier, **kwargs)
[docs] def show_data(self, measurement): """ displays the data on the osd. :param measurement: meausrement to display :return: """ try: message_str = "T:{temperature:.2f} H:{humidity:.2f} P:{pressure:.2f}" self.sensehat.show_message(message_str.format(**measurement)) except Exception as e: self.logger.error(str(e))
[docs] def get_measurement(self) -> dict: """ get measurements for sensehat :return: """ try: measurement = [self.sensehat.temperature, self.sensehat.humidity, self.sensehat.pressure] return {key: round_to_1dp(value) for key, value in zip(self.data_headers, measurement)} except Exception as e: self.logger.error("Couldnt get data, {}".format(str(e))) return {_: None for _ in self.data_headers}
# # class ThreadedSensor(Sensor, Thread): # """ # threaded implementation of the sensor cclass. # """ # # def __init__(self, identifier, *args, **kwargs): # Thread.__init__(self, name=identifier) # self.daemon = True # print("Threaded started {}: {}".format(self.__class__, identifier)) # # # class ThreadedSenseHat(SenseHatMonitor, ThreadedSensor): # """ # threaded implementation for the AstroPI SenseHat # """ # # def __init__(self, identifier, *args, **kwargs): # super().__init__(identifier, *args, **kwargs) # super(ThreadedSensor, self).__init__(identifier, *args, **kwargs) # # # class ThreadedDHT(DHTMonitor, ThreadedSensor): # """ # threaded implementation for the Adafruit DHT/AM GPIO sensor module # """ # def __init__(self, identifier, *args, **kwargs): # super().__init__(identifier, *args, **kwargs) # super(ThreadedSensor, self).__init__(identifier, *args, **kwargs)