# bureau import configparser import functools import glob import inspect import io import json import logging import os.path import random import re import shutil import string import subprocess import tempfile import threading import lmdb import PIL import weasyprint import zmq from escpos import printer from mako.template import Template from ruamel.yaml import YAML def update_commands(cls): """ this is some internal magic to keep track of our commands """ for name, method in cls.__dict__.items(): print("name %s method %s" % (name, method)) if hasattr(method, "command"): comstr = method.command cls.commands[comstr] = method return cls def add_command(comstr, name=""): """ decorator for making a method into a command """ def decorator(func): """ the decorator itself """ @functools.wraps(func) def func_wrap(*args, **kwargs): """ this is to avoid roaching the namespace """ return func(*args, **kwargs) func_wrap.command = comstr func_wrap.name = name return func_wrap return decorator def add_api(apistr, name=""): """ decorator for making a method into a public bureau api method""" def decorator(func): """ the decorator itself """ @functools.wraps(func) def func_wrap(*args, **kwargs): """ this is to avoid roaching the namespace """ return func(*args, **kwargs) func_wrap.api = apistr func_wrap.name = name return func_wrap return decorator def add_webview(webstr, name, in_menu=True): """ decorator for making a method available on the web-admin""" def decorator(func): """ the decorator iteself """ @functools.wraps(func) def func_wrap(*args, **kwargs): """ this is to avoid roaching the namespace """ return func(*args, **kwargs) func_wrap.webview = webstr func_wrap.name = name return func_wrap return decorator class LogPrinter(logging.Handler): """ LogPrinter prints logs on a receipt printer for screenless debugging. """ def __init__(self, log_printer): self.printer = log_printer logging.Handler.__init__(self) def emit(self, record): if (self.printer["inep"] is None) and (self.printer["outep"] is None): prn = printer.Usb(self.printer["vendorid"], self.printer["productid"]) else: prn = printer.Usb(self.printer["vendorid"], self.printer["productid"], in_ep=self.printer["inep"], out_ep=self.printer["outep"]) msg = self.format(record) #text = textwrap.fill(msg, width=self.printer["textwidth"]) spliter_regex = r'.{1,' + self.printer["textwidth"] + '}(?:\s+|$)' out_text = "\r\n".join(line.strip() for line in re.findall(spliter_regex, text)) out_text += "\r\n" * 4 prn.text(out_text) prn.cut() class KeyValStore(object): """ A KeyValStore is a simple wrapper for LMDB flat file storage. It's very fast and simple for large databases with small (less than 4kb) entries. If you need something larger try the filesystem. If you need more structure or indexes try sqlite. Keys and values MUST BE UNICODE STRINGS! """ def __init__(self, env, name): self.env = env self.db = env.open_db(name.encode()) def store(self, key, val): """ Store a key-val pair. Returns True on success. """ with self.env.begin(write=True, db=self.db) as txn: ret = txn.put(key.encode(), val.encode()) return ret def store_and_get_shortcode(self, val): """ Find an un-used shortcode and use it as a key to store the value given. Note, each db is limited to about a billion keys so don't go too crazy. returns a 5-char shortcode string. """ def _shortcode(): # returns a random 5-char string return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) # we only have about a billion so make sure we don't collide keys with self.env.begin(write=True, db=self.db) as txn: res = "not None" while res is not None: tmpcode = _shortcode() res = txn.get(tmpcode.encode()) txn.put(tmpcode.encode(), val.encode()) return tmpcode def get(self, key): """ Look up a value. Returns value as a unicode string or None if nonexistent. """ with self.env.begin(db=self.db) as txn: res = txn.get(key.encode()) return res.decode("utf-8") def delete(self, key): """ Delete a key-val pair. Returns True on success. """ with self.env.begin(write=True, db=self.db) as txn: ret = txn.delete(key.encode) return ret class Bureau(object): """ Bureau is a base class that implements standard methods for inter-bureau communication, IO, registration and some convenient stuff for printing. """ name = "TEST" prefix = "00" version = 0 default_config = {} def __init__(self): """ set up ZeroMQ connections, printers, fonts and register commands""" self.commands = {} self.api = {} self.webviews = {} mypath = inspect.getfile(self.__class__) self.mdir = os.path.dirname(mypath) basepath = os.path.expanduser("~/.screenless") if not os.path.exists(basepath): os.mkdir(basepath) os.chdir(basepath) # make sure fonts are copied fontpath = os.path.join(basepath, "fonts") fontsrc = os.path.join(self.mdir, "fonts") if os.path.exists(fontsrc): fontlist = glob.glob(os.path.join(fontsrc, "*.otf")) fontlist += glob.glob(os.path.join(fontsrc, "*.ttf")) for fontfile in fontlist: fontbase = os.path.basename(fontfile) if not os.path.exists(os.path.join(fontpath, fontbase)): shutil.copy(fontfile, fontpath) self.load_config() # load printer configs printcfg = configparser.ConfigParser() try: printcfg.read("printers.cfg") self.smprint = {} for hexkey in ["vendorid", "productid", "inep", "outep"]: val = printcfg["smallprinter"][hexkey] if val == "": self.smprint[hexkey] = None else: self.smprint[hexkey] = int(val, 16) self.smprint["width"] = int(printcfg["smallprinter"]["width"]) self.smprint["textwidth"] = int(printcfg["smallprinter"]["textwidth"]) self.lp = {} self.lp["name"] = printcfg["largeprinter"]["name"] self.lp["papersize"] = printcfg["largeprinter"]["papersize"] self.lp["duplex"] = printcfg["largeprinter"].getboolean("duplex") except KeyError: # TODO: eventually refactor this since it could overwrite a half-broken config sp_dict = {"vendorid": "", "productid": "", "inep": "", "outep": "", "width": "384", "textwidth": "32"} lp_dict = {"name": "", "papersize": "A4", "duplex": "False"} printcfg["smallprinter"] = sp_dict printcfg["largeprinter"] = lp_dict with open("printers.cfg", "w") as print_conf_file: printcfg.write(print_conf_file) self.smprint = sp_dict self.lp = lp_dict # setup log file if "debug" in self.config: if self.config["debug"]: log_level = logging.DEBUG else: log_level = logging.ERROR else: log_level = logging.ERROR logfile = os.path.join(basepath, self.prefix + ".log") logging.basicConfig(filename=logfile, level=log_level) self.log = logging.getLogger(self.prefix) log_printer = LogPrinter(self.smprint) log_format = logging.Formatter('LOG ${levelname} $name: $message', style='$') log_printer.setFormatter(log_format) self.log.addHandler(log_printer) # setup a dir to store files and data self.datadir = os.path.join(basepath, self.prefix) if not os.path.exists(self.datadir): os.mkdir(self.datadir) # set up a basic key-value store with LMDB - max 10 sub-dbs # if a bureau needs more then just import lmdb and roll your own dbfile = os.path.join(self.datadir, self.prefix + ".lmdb") self.dbenv = lmdb.open(dbfile, max_dbs=10) self.context = zmq.Context() self._recv = self.context.socket(zmq.REP) self._recv.bind("ipc://" + self.prefix + ".ipc") self.log.debug("bureau " + self.name + " waiting for messages") self.log.debug("commands: ") self.log.debug(str(self.commands)) # def _log_exception(typ, value, tb): # self.log.error("CRASH TRACE: {0}".format(str(value)), exc_info=(typ, value, tb)) # sys.__excepthook__(typ, value, tb) def open_db(self, name): """ Loads and if not yet existing, creates, an LMDB database returns a KeyValStore object """ db = KeyValStore(self.dbenv, name) return db def load_config(self): """ load (or reload) config data from file """ yaml = YAML() cfgfile = self.prefix + ".yml" if not os.path.exists(cfgfile): with open(cfgfile, "w") as configfile: yaml.dump(self.default_config, configfile) with open(cfgfile) as cfp: self.config = yaml.load(cfp) if self.config is None: self.config = {} def send(self, recipient, message, data=None): """ send commands or API calls to another bureau. recipient: the 2-character bureau ID message: a text based message as used in many commands data: an optional dict, used in API calls returns either empty string, text or a json object """ # clean up for sloppy offices message = message.strip() if not message.endswith("."): message += "." if data: message += json.dumps(data) sender = self.context.socket(zmq.REQ) sender.connect("ipc://" + recipient + ".ipc") sender.send_string(message) events = sender.poll(timeout=10000) if events is not 0: resp = sender.recv_string() if len(resp) == 0: return None elif resp[0] == "0": if len(resp) == 1: return None else: return json.loads(resp[1:]) else: # TODO: this may need some better error handling return resp else: self.log.warning("message" + message + " sent... timed out after 10 seconds.") return None def _publish_methods(self): """ this internal method registers all public commands and bureau API methods. Inhuman Resources module can then display menus and docs. """ # register bureau with Inhuman Resources bureau_detail = {"name": self.name, "prefix": self.prefix, "desc": self.__doc__} # slight hack to avoid messy self-reference and chicken-egg waiting if self.prefix == "IR": method = getattr(self, "add_bureau") method(bureau_detail) else: self.send("IR", "addbureau", bureau_detail) # find and store all published methods for member in dir(self): method = getattr(self, member) # ignore anything that is not a method with command or api details if not (callable(method) and (hasattr(method, "command") or hasattr(method, "api") or hasattr(method, "webview"))): continue if hasattr(method, "command"): self.commands[method.command] = method cmd_detail = {"cmdname": method.name, "prefix": self.prefix, "cmd": method.command, "desc": method.__doc__} if self.prefix == "IR": method = getattr(self, "add_cmd") method(cmd_detail) else: self.send("IR", "addcommand", cmd_detail) elif hasattr(method, "api"): self.api[method.api] = method api_detail = {"apiname": method.name, "prefix": self.prefix, "api": method.api, "desc": method.__doc__} if self.prefix == "IR": method = getattr(self, "add_api_method") method(api_detail) else: self.send("IR", "addapi", api_detail) elif hasattr(method, "webview"): self.webviews[method.webview] = method webview_detail = { "name": method.name, "prefix": self.prefix, "webview": method.webview, "desc": method.__doc__ } if self.prefix == "IR": method = getattr(self, "add_webview") method(webview_detail) else: self.send("IR", "addweb", webview_detail) self.log.debug("registered:") self.log.debug(str(self.commands)) self.log.debug(str(self.api)) def _get_small_printer(self): """ returns an instance of the small escpos printer """ if (self.smprint["inep"] is None) and (self.smprint["outep"] is None): prn = printer.Usb(self.smprint["vendorid"], self.smprint["productid"]) else: prn = printer.Usb(self.smprint["vendorid"], self.smprint["productid"], in_ep=self.smprint["inep"], out_ep=self.smprint["outep"]) return prn def print_full(self, template, **kwargs): """print a full page (A4) document """ # TODO: look up the printer LPR name / allow multiple printers/non-default # run template with kwargs templfile = os.path.join(self.mdir, template) self.log.debug("printing with template: %s", templfile) templ = Template(filename=templfile, input_encoding="utf-8", strict_undefined=True) # TODO: make paper size a config variable pdfpath = tempfile.mkstemp(".pdf")[1] self.log.debug("rendering to: " + pdfpath) # use an URL fetcher that waits 30s instead of 10s def slowfetch(url, timeout=30, ssl_context=None): if len(url) < 50: print("fetching", url) import time start = time.time() ret = weasyprint.default_url_fetcher(url, 30, ssl_context) elapsed = time.time() - start if len(url) < 50: print("fetched", url, "in", elapsed) return ret html_rendered = weasyprint.HTML(url_fetcher=slowfetch, string=templ.render_unicode(**kwargs)) if self.log.getEffectiveLevel() == logging.DEBUG: with open("/tmp/debug.html", "w") as html_out: html_out.write(templ.render_unicode(**kwargs)) self.log.debug("debug html output at /tmp/debug.html") html_rendered.write_pdf(pdfpath) subprocess.call("lpr -o sides=two-sided-long-edge -o InputSlot=Upper " + pdfpath, shell=True) # TODO: make this asynchronous def print_small(self, text, cut=True, linefeed=True): """ print on Thermal Line printer. """ prn = self._get_small_printer() spliter_regex = r'.{1,' + self.printer["textwidth"] + '}(?:\s+|$)' out_text = "\r\n".join(line.strip() for line in re.findall(splitter_regex, text)) prn.text(out_text + "\r\n") if linefeed: prn.text("\r\n\r\n") if cut: prn.cut() def print_small_image(self, img, linefeed=True): """ print an image on the mini thermal printer. """ prn = self._get_small_printer() if type(img) is PIL.Image.Image: im = img else: im = PIL.Image.open(img) # NOTE: might be worth tring to push up brightness im = PIL.ImageOps.equalize(im) # stretch histogram for nicer dither # resize to fit printer im.thumbnail((self.smprint["width"], 1024), PIL.Image.ANTIALIAS) # not using this bitImageColumn crashes some printers, sigh prn.image(im, impl="bitImageColumn") if linefeed: prn.text("\r\n") @add_command("test") def test(self, data=None): """ Standard test command. """ # stupid test to see if modules work print(("hi! testing. " + self.name + " bureau seems to work!")) return "seems to work." def _run_io(self): """ wrapper for run_io so that we can catch threaded exceptions and log """ try: self.run_io() except Exception as err: self.log.exception("%s CRASHED with %s\n", self.name, err) raise def run_io(self): """process hardware or timed input This method can be ignored for most Bureaus. It should be overloaded for any services that need to independently generate messages as this is just a placeholder. It is run in a separate thread and should handle any input or timed events. Messages are then sent to other Bureaus via the self.send connection to the OfficeManager. Don't forget to to consier thread safety issues when accessing data! """ pass def run(self): """ wrapper running the main loop and logging all exceptions """ try: self._run() except Exception as err: self.log.exception("%s CRASHED with %s\n", self.name, err) raise def _run(self): """ main loop for processing messages This runs all relelvant event processing loops. """ # start the hardware input handler io_handler = threading.Thread(target=self._run_io, daemon=True) io_handler.start() # register commands and api methods self._publish_methods() poller = zmq.Poller() poller.register(self._recv, zmq.POLLIN) while True: msgs = dict(poller.poll(500)) if msgs: if msgs.get(self._recv) == zmq.POLLIN: msg = self._recv.recv_string(flags=zmq.NOBLOCK) else: continue try: self.log.debug("got message:" + msg) dot = msg.find(".") ref = msg[:dot] if (dot < len(msg) - 1) and (dot > 0): self.log.debug("msg length: %d", len(msg)) self.log.debug("dot at %d", dot) # TODO: maybe trim off the trailing "." for convenience data = msg[dot + 1:] else: data = None self.log.debug("data: " + str(data)) except IndexError as err: self.log.warning("invalid message: %s", err) continue self.log.debug("got method: " + ref) if (ref in self.commands) or (ref in self.api) or (ref in self.webviews): if ref in self.api: if data: data = json.loads(data) ret = json.dumps(self.api[ref](data)) else: ret = json.dumps(self.api[ref]()) elif ref in self.webviews: if data: data = json.loads(data) ret = json.dumps(self.webviews[ref](data)) else: ret = json.dumps(self.webviews[ref]()) else: if data: ret = self.commands[ref](data) else: ret = self.commands[ref]() if ret is None: ret = "" ret = "0" + ret self._recv.send_string(ret) else: self.log.warning("error! Command/API %s not found", ref) self._recv.send_unicode("Error! Command/API not found.") def main(): buro = Bureau() buro.run() if __name__ == "__main__": main()