Source code for libs.Uploader

import datetime
import ftplib
import logging
import os
import time
from glob import glob
from collections import deque
from threading import Thread, Event
import pysftp
from .CryptUtil import SSHManager
from .SysUtil import SysUtil

try:
    logging.config.fileConfig("logging.ini")
    logging.getLogger("paramiko").setLevel(logging.WARNING)
except:
    pass


[docs]class Uploader(Thread): """ Uploader class, used to upload, """ # upload interval upload_interval = 120 remove_source_files = True def __init__(self, identifier: str, queue: deque = None): # same thread name hackery that the Camera threads use Thread.__init__(self, name=identifier + "-Uploader") self.stopper = Event() if queue is None: queue = deque(tuple(), 256) self.communication_queue = queue self.identifier = identifier self.logger = logging.getLogger(self.getName()) self.startup_time = datetime.datetime.now() self.ssh_manager = SSHManager() self.machine_id = SysUtil.get_machineid() self.last_upload_time = datetime.datetime.fromtimestamp(0) self.last_upload_list = [] self.total_data_uploaded_tb = 0 self.total_data_uploaded_b = 0 self.config_filename = SysUtil.identifier_to_ini(self.identifier) self.config = \ self.host = \ self.username = \ self.password = \ self.camera_name = \ self.source_dir = \ self.server_dir = \ self.upload_enabled = None self.re_init() SysUtil().add_watch(self.config_filename, self.re_init)
[docs] def re_init(self): """ setup to be run each time the config is reloaded """ self.machine_id = SysUtil.get_machineid() self.config = SysUtil.ensure_config(self.identifier) self.host = self.config["ftp"]["server"] self.username = self.config["ftp"]["username"] self.password = self.config["ftp"]["password"] self.server_dir = self.config["ftp"]["directory"] self.camera_name = self.config["camera"]["name"] self.source_dir = self.config["localfiles"]["upload_dir"] self.upload_enabled = self.config.getboolean("ftp", "enabled") self.last_upload_list = []
[docs] def upload(self, file_names): """ uploads files via sftp. deletes the files as they are uploaded, creates new directories if needed. :param file_names: filenames to upload """ try: self.logger.debug("Connecting sftp and uploading buddy") # open link and create directory if for some reason it doesnt exist params = dict(host=self.host, username=self.username) params['cnopts'] = pysftp.CnOpts(knownhosts=self.ssh_manager.known_hosts_path) params['cnopts'].hostkeys = None if self.password is not None: params['password'] = self.password if os.path.exists(self.ssh_manager.priv_path) and os.path.exists(self.ssh_manager.known_hosts_path): params['private_key'] = self.ssh_manager.priv_path params['cnopts'] = pysftp.CnOpts(knownhosts=self.ssh_manager.known_hosts_path) with pysftp.Connection(**params) as link: root = os.path.join(link.getcwd() or "", self.server_dir, self.camera_name) root = root[1:] if root.startswith("/") else root # make the root dir in case it doesnt exist. if not link.isdir(root): self.logger.debug("Making root directory") self.mkdir_recursive(link, root) link.chdir(root) root = os.path.join(link.getcwd()) self.logger.info(root) self.logger.debug("Uploading...") # dump ze files. total_time = time.time() total_size = 0 for idx, f in enumerate(file_names): try: onefile_time = time.time() target_file = f.replace(self.source_dir, "") target_file = target_file[1:] if target_file.startswith("/") else target_file dirname = os.path.dirname(target_file) if os.path.isdir(f): self.mkdir_recursive(link, target_file) continue if not link.isdir(dirname): self.mkdir_recursive(link, dirname) link.chdir(os.path.join(root, dirname)) link.put(f, os.path.basename(target_file) + ".tmp") if link.exists(os.path.basename(target_file)): link.remove(os.path.basename(target_file)) link.rename(os.path.basename(target_file) + ".tmp", os.path.basename(target_file)) link.chmod(os.path.basename(target_file), mode=755) self.total_data_uploaded_b += os.path.getsize(f) if self.remove_source_files: size = os.path.getsize(f) total_size += size mbps = (size/(time.time() - onefile_time))/1024/1024 os.remove(f) self.logger.debug( "Uploaded file {0}/{1} through sftp and removed from local filesystem, {2:.2f}Mb/s".format(idx, len(file_names), mbps)) else: self.logger.debug("Successfully uploaded {}/{} through sftp".format(idx, len(file_names))) self.last_upload_time = datetime.datetime.now() except Exception as e: self.logger.error("sftp:{}".format(str(e))) finally: link.chdir(root) mbps = (total_size/(time.time() - total_time))/1024/1024 self.logger.debug("Finished uploading, {0:.2f}Mb/s".format(mbps)) if self.total_data_uploaded_b > 1000000000000: curr = (((self.total_data_uploaded_b / 1024) / 1024) / 1024) / 1024 self.total_data_uploaded_b = 0 self.total_data_uploaded_tb = curr except Exception as e: # log a warning if fail because SFTP is meant to fail to allow FTP fallback self.logger.error("SFTP failed: {}".format(str(e))) self.logger.debug("Looks like I can't make a connection using sftp, eh. Falling back to ftp.") try: self.logger.debug("Connecting ftp") # open link and create directory if for some reason it doesnt exist ftp = ftplib.FTP(self.host) ftp.login(self.username, self.password) self.mkdir_recursive(ftp, os.path.join(self.server_dir, self.camera_name)) self.logger.info("Uploading") # dump ze files. for f in file_names: ftp.storbinary('stor ' + os.path.basename(f), open(f, 'rb'), 1024) os.remove(f) self.logger.debug("Successfuly uploaded {} through ftp and removed from local filesystem".format(f)) self.last_upload_time = datetime.datetime.now() except Exception as e: # log error if cant upload using FTP. FTP is last resort. self.logger.error(str(e))
[docs] def mkdir_recursive(self, link, remote_directory, mkdir=None, chdir=None): """ Creates directories recursively on the remote server :param link: ftp/sftp connection object :param remote_directory: :param chdir: method used to change to a directory :param mkdir: method used to make a directory """ if not (mkdir and chdir): if isinstance(link, pysftp.Connection): mkdir, chdir = link.mkdir, link.chdir elif isinstance(link, ftplib.FTP): mkdir, chdir = link.mkd, link.cwd try: if remote_directory in ('', "/"): return remote_dirname, basename = os.path.split(remote_directory) self.mkdir_recursive(link, os.path.dirname(remote_directory), mkdir=mkdir, chdir=chdir) try: chdir(basename) except IOError: self.logger.info("Sorry, just have to make some new directories, eh. ") mkdir(basename) chdir(basename) except Exception as e: self.logger.error("something went wrong making directories... {}".format(str(e)))
[docs] def communicate_with_updater(self): """ communication member. This is meant to send some metadata to the updater thread. """ if not self.communication_queue: return try: self.logger.debug("Collecting metadata") data = dict( identifier=self.identifier, uploaded=SysUtil.sizeof_fmt(self.total_data_uploaded_b), uploads=self.last_upload_list, last_upload=int(self.last_upload_time.strftime("%s")) ) self.communication_queue.append(data) except Exception as e: self.logger.error("inter thread communication error: {}".format(str(e)))
[docs] def run(self): """ run method. main loop for Uploaders. """ while True and not self.stopper.is_set(): try: upload_list = glob(os.path.join(self.source_dir, '**'), recursive=True) if len(upload_list) == 0: self.logger.info("No files in upload directory") if (len(upload_list) > 0) and self.upload_enabled: start_upload_time = time.time() self.logger.info("Preparing to upload %d files" % len(upload_list)) try: l_im = os.path.join(self.source_dir, "last_image.jpg") if l_im in upload_list: upload_list.insert(0, upload_list.pop(upload_list.index(l_im))) except Exception as e: self.logger.info( "Something went wrong sorting the last image to the front of the list: {}".format(str(e))) self.upload(upload_list) self.communicate_with_updater() self.logger.info( "Average upload time: {0:.2f}s".format((time.time() - start_upload_time) / len(upload_list))) self.logger.info("Total upload time: {0:.2f}s".format(time.time() - start_upload_time)) except Exception as e: self.logger.error("Unhandled exception in uploader run method: {}".format(str(e))) time.sleep(Uploader.upload_interval)
[docs] def stop(self): """ stopper method """ self.stopper.set()
[docs]class GenericUploader(Uploader): """ Generic uploader for uploading logs sensor data, etc. """ remove_source_files = True
[docs] def fill_me(self, dict_of_values: dict): """ fills self with values from a dict. :param dict_of_values: dictionary of key: values :type dict_of_values: dict """ for k, v in dict_of_values.items(): if hasattr(self, k): setattr(self, k, v)
def __init__(self, identifier: str, source_dir: str = None, host: str = None, config: dict = None, queue: deque = None): # same thread name hackery that the Camera threads use Thread.__init__(self, name=identifier + "-Uploader") self.stopper = Event() queue = queue if queue is not None else deque(tuple(), 256) self.communication_queue = queue self.identifier = identifier self.camera_name = identifier self.source_dir = source_dir self.logger = logging.getLogger(self.getName()) self.startup_time = datetime.datetime.now() self.ssh_manager = SSHManager() self.machine_id = SysUtil.get_machineid() self.last_upload_time = datetime.datetime.fromtimestamp(0) self.last_upload_list = [] self.total_data_uploaded_tb = 0 self.total_data_uploaded_b = 0 self.host = host or "sftp.traitcapture.org" self.upload_enabled = True self.username = "picam" self.password = None self.server_dir = "/" if config and type(config) is dict: self.camera_name = config.get("name", self.camera_name) self.source_dir = config.get("output_dir", self.source_dir) if type(config.get("upload")) is dict: config = config.get("upload") self.fill_me(config) self.upload_enabled = config.get("enabled", True)
[docs] def re_init(self): """ Your config is in another castle. """ self.last_upload_list = []