You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
18 KiB
Python

# bureau
import configparser
import functools
import glob
import inspect
import io
import json
import logging
import os.path
import random
import shutil
import string
import subprocess
import tempfile
import textwrap
import threading
import lmdb
import PIL
import weasyprint
import zmq
from escpos import printer
from mako.template import Template
from ruamel.yaml import YAML
def update_commands(cls):
""" this is some internal magic to keep track of our commands """
for name, method in cls.__dict__.items():
print("name %s method %s" % (name, method))
if hasattr(method, "command"):
comstr = method.command
cls.commands[comstr] = method
return cls
def add_command(comstr, name=""):
""" decorator for making a method into a command """
def decorator(func):
""" the decorator itself """
@functools.wraps(func)
def func_wrap(*args, **kwargs):
""" this is to avoid roaching the namespace """
return func(*args, **kwargs)
func_wrap.command = comstr
func_wrap.name = name
return func_wrap
return decorator
def add_api(apistr, name=""):
""" decorator for making a method into a public bureau api method"""
def decorator(func):
""" the decorator itself """
@functools.wraps(func)
def func_wrap(*args, **kwargs):
""" this is to avoid roaching the namespace """
return func(*args, **kwargs)
func_wrap.api = apistr
func_wrap.name = name
return func_wrap
return decorator
class LogPrinter(logging.Handler):
"""
LogPrinter prints logs on a receipt printer for screenless debugging.
"""
def __init__(self, log_printer):
self.printer = log_printer
logging.Handler.__init__(self)
def emit(self, record):
if (self.printer["inep"] is None) and (self.printer["outep"] is None):
prn = printer.Usb(self.printer["vendorid"], self.printer["productid"])
else:
prn = printer.Usb(self.printer["vendorid"],
self.printer["productid"],
in_ep=self.printer["inep"],
out_ep=self.printer["outep"])
msg = self.format(record)
text = textwrap.fill(msg, width=self.printer["textwidth"])
text += "\r\n" * 4
prn.text(text)
prn.cut()
class KeyValStore(object):
"""
A KeyValStore is a simple wrapper for LMDB flat file storage. It's very
fast and simple for large databases with small (less than 4kb) entries.
If you need something larger try the filesystem. If you need more structure
or indexes try sqlite. Keys and values MUST BE UNICODE STRINGS!
"""
def __init__(self, env, name):
self.env = env
self.db = env.open_db(name.encode())
def store(self, key, val):
"""
Store a key-val pair.
Returns True on success.
"""
with self.env.begin(write=True, db=self.db) as txn:
ret = txn.put(key.encode(), val.encode())
return ret
def store_and_get_shortcode(self, val):
"""
Find an un-used shortcode and use it as a key to store the value given.
Note, each db is limited to about a billion keys so don't go too crazy.
returns a 5-char shortcode string.
"""
def _shortcode():
# returns a random 5-char string
return ''.join(random.choice(string.ascii_letters + string.digits)
for _ in range(5))
# we only have about a billion so make sure we don't collide keys
with self.env.begin(write=True, db=self.db) as txn:
res = "not None"
while res is not None:
tmpcode = _shortcode()
res = txn.get(tmpcode.encode())
txn.put(tmpcode.encode(), val.encode())
return tmpcode
def get(self, key):
"""
Look up a value.
Returns value as a unicode string or None if nonexistent.
"""
with self.env.begin(db=self.db) as txn:
res = txn.get(key.encode())
return res.decode("utf-8")
def delete(self, key):
"""
Delete a key-val pair.
Returns True on success.
"""
with self.env.begin(write=True, db=self.db) as txn:
ret = txn.delete(key.encode)
return ret
class Bureau(object):
""" Bureau is a base class that implements standard methods for
inter-bureau communication, IO, registration and some convenient stuff
for printing. """
name = "TEST"
prefix = "00"
version = 0
default_config = {}
def __init__(self):
""" set up ZeroMQ connections, printers, fonts and register commands"""
self.commands = {}
self.api = {}
mypath = inspect.getfile(self.__class__)
self.mdir = os.path.dirname(mypath)
basepath = os.path.expanduser("~/.screenless")
if not os.path.exists(basepath):
os.mkdir(basepath)
os.chdir(basepath)
# make sure fonts are copied
fontpath = os.path.join(basepath, "fonts")
fontsrc = os.path.join(self.mdir, "fonts")
if os.path.exists(fontsrc):
fontlist = glob.glob(os.path.join(fontsrc, "*.otf"))
fontlist += glob.glob(os.path.join(fontsrc, "*.ttf"))
for fontfile in fontlist:
fontbase = os.path.basename(fontfile)
if not os.path.exists(os.path.join(fontpath, fontbase)):
shutil.copy(fontfile, fontpath)
self.load_config()
# load printer configs
printcfg = configparser.ConfigParser()
try:
printcfg.read("printers.cfg")
self.smprint = {}
for hexkey in ["vendorid", "productid", "inep", "outep"]:
val = printcfg["smallprinter"][hexkey]
if val == "":
self.smprint[hexkey] = None
else:
self.smprint[hexkey] = int(val, 16)
self.smprint["width"] = int(printcfg["smallprinter"]["width"])
self.smprint["textwidth"] = int(printcfg["smallprinter"]["textwidth"])
self.lp = {}
self.lp["name"] = printcfg["largeprinter"]["name"]
self.lp["papersize"] = printcfg["largeprinter"]["papersize"]
self.lp["duplex"] = printcfg["largeprinter"].getboolean("duplex")
except KeyError:
# TODO: eventually refactor this since it could overwrite a half-broken config
sp_dict = {"vendorid": "", "productid": "", "inep": "",
"outep": "", "width": "384", "textwidth": "32"}
lp_dict = {"name": "", "papersize": "A4", "duplex": "False"}
printcfg["smallprinter"] = sp_dict
printcfg["largeprinter"] = lp_dict
with open("printers.cfg", "w") as print_conf_file:
printcfg.write(print_conf_file)
self.smprint = sp_dict
self.lp = lp_dict
# setup log file
if "debug" in self.config:
if self.config["debug"]:
log_level = logging.DEBUG
else:
log_level = logging.ERROR
else:
log_level = logging.ERROR
logfile = os.path.join(basepath, self.prefix + ".log")
logging.basicConfig(filename=logfile, level=log_level)
self.log = logging.getLogger(self.prefix)
log_printer = LogPrinter(self.smprint)
log_format = logging.Formatter('LOG ${levelname} $name: $message', style='$')
log_printer.setFormatter(log_format)
self.log.addHandler(log_printer)
# setup a dir to store files and data
self.datadir = os.path.join(basepath, self.prefix)
if not os.path.exists(self.datadir):
os.mkdir(self.datadir)
# set up a basic key-value store with LMDB - max 10 sub-dbs
# if a bureau needs more then just import lmdb and roll your own
dbfile = os.path.join(self.datadir, self.prefix + ".lmdb")
self.dbenv = lmdb.open(dbfile, max_dbs=10)
self.context = zmq.Context()
self._recv = self.context.socket(zmq.REP)
self._recv.bind("ipc://" + self.prefix + ".ipc")
self.log.debug("bureau " + self.name + " waiting for messages")
self.log.debug("commands: ")
self.log.debug(str(self.commands))
# def _log_exception(typ, value, tb):
# self.log.error("CRASH TRACE: {0}".format(str(value)), exc_info=(typ, value, tb))
# sys.__excepthook__(typ, value, tb)
def open_db(self, name):
"""
Loads and if not yet existing, creates, an LMDB database
returns a KeyValStore object
"""
db = KeyValStore(self.dbenv, name)
return db
def load_config(self):
"""
load (or reload) config data from file
"""
yaml = YAML()
cfgfile = self.prefix + ".yml"
if not os.path.exists(cfgfile):
with open(cfgfile, "w") as configfile:
yaml.dump(self.default_config, configfile)
with open(cfgfile) as cfp:
self.config = yaml.load(cfp)
def send(self, recipient, message, data=None):
"""
send commands or API calls to another bureau.
recipient: the 2-character bureau ID
message: a text based message as used in many commands
data: an optional dict, used in API calls
returns either empty string, text or a json object
"""
# clean up for sloppy offices
message = message.strip()
if not message.endswith("."):
message += "."
if data:
message += json.dumps(data)
sender = self.context.socket(zmq.REQ)
sender.connect("ipc://" + recipient + ".ipc")
sender.send_string(message)
events = sender.poll(timeout=10000)
if events is not 0:
resp = sender.recv_string()
if len(resp) == 0:
return None
elif resp[0] == "0":
if len(resp) == 1:
return None
else:
return json.loads(resp[1:])
else:
# TODO: this may need some better error handling
return resp
else:
self.log.warning("message" + message +
" sent... timed out after 10 seconds.")
return None
def _publish_methods(self):
"""
this internal method registers all public commands and bureau API
methods. Inhuman Resources module can then display menus and docs.
"""
# register bureau with Inhuman Resources
bureau_detail = {"name": self.name, "prefix": self.prefix,
"desc": self.__doc__}
# slight hack to avoid messy self-reference and chicken-egg waiting
if self.prefix == "IR":
method = getattr(self, "add_bureau")
method(bureau_detail)
else:
self.send("IR", "addbureau", bureau_detail)
# find and store all published methods
for member in dir(self):
method = getattr(self, member)
# ignore anything that is not a method with command or api details
if not (callable(method) and (hasattr(method, "command") or
hasattr(method, "api"))):
continue
if hasattr(method, "command"):
self.commands[method.command] = method
cmd_detail = {"cmdname": method.name,
"prefix": self.prefix,
"cmd": method.command,
"desc": method.__doc__}
if self.prefix == "IR":
method = getattr(self, "add_cmd")
method(cmd_detail)
else:
self.send("IR", "addcommand", cmd_detail)
elif hasattr(method, "api"):
self.api[method.api] = method
api_detail = {"apiname": method.name,
"prefix": self.prefix,
"api": method.api,
"desc": method.__doc__}
if self.prefix == "IR":
method = getattr(self, "add_api_method")
method(api_detail)
else:
self.send("IR", "addapi", api_detail)
self.log.debug("registered:")
self.log.debug(str(self.commands))
self.log.debug(str(self.api))
def _get_small_printer(self):
"""
returns an instance of the small escpos printer
"""
if (self.smprint["inep"] is None) and (self.smprint["outep"] is None):
prn = printer.Usb(self.smprint["vendorid"], self.smprint["productid"])
else:
prn = printer.Usb(self.smprint["vendorid"],
self.smprint["productid"],
in_ep=self.smprint["inep"],
out_ep=self.smprint["outep"])
return prn
def print_full(self, template, **kwargs):
"""print a full page (A4) document """
# TODO: look up the printer LPR name / allow multiple printers/non-default
# run template with kwargs
templfile = os.path.join(self.mdir, template)
self.log.debug("printing with template: %s", templfile)
templ = Template(filename=templfile, strict_undefined=True)
# TODO: make paper size a config variable
pdfpath = tempfile.mkstemp(".pdf")[1]
self.log.debug("rendering to: " + pdfpath)
weasyprint.HTML(string=templ.render_unicode(**kwargs)).write_pdf(pdfpath)
subprocess.call("lpr -o sides=two-sided-long-edge -o InputSlot=Upper " + pdfpath, shell=True)
# TODO: make this asynchronous
def print_small(self, text, cut=True):
"""
print on Thermal Line printer.
"""
prn = self._get_small_printer()
text = textwrap.fill(text, width=self.smprint["textwidth"])
text += "\r\n" * 2
prn.text(text + "\r\n\r\n")
if cut:
prn.cut()
def print_small_image(self, img):
"""
print an image on the mini thermal printer.
"""
prn = self._get_small_printer()
if type(img) is PIL.Image.Image:
im = img
else:
im = PIL.Image.open(img)
# NOTE: might be worth tring to push up brightness
im = PIL.ImageOps.equalize(im) # stretch histogram for nicer dither
# resize to fit printer
im.thumbnail((self.smprint["width"], 1024), PIL.Image.ANTIALIAS)
# not using this bitImageColumn crashes some printers, sigh
prn.image(im, impl="bitImageColumn")
@add_command("test")
def test(self, data=None):
"""
Standard test command.
"""
# stupid test to see if modules work
print(("hi! testing. " + self.name + " bureau seems to work!"))
return "seems to work."
def _run_io(self):
"""
wrapper for run_io so that we can catch threaded exceptions and log
"""
try:
self.run_io()
except Exception as err:
self.log.exception("%s CRASHED with %s\n", self.name, err)
raise
def run_io(self):
"""process hardware or timed input
This method can be ignored for most Bureaus.
It should be overloaded for any services that need to independently
generate messages as this is just a placeholder.
It is run in a separate thread and should handle any input or
timed events. Messages are then sent to other Bureaus via the
self.send connection to the OfficeManager. Don't forget to
to consier thread safety issues when accessing data!
"""
pass
def run(self):
"""
wrapper running the main loop and logging all exceptions
"""
try:
self._run()
except Exception as err:
self.log.exception("%s CRASHED with %s\n", self.name, err)
raise
def _run(self):
"""
main loop for processing messages
This runs all relelvant event processing loops.
"""
# start the hardware input handler
io_handler = threading.Thread(target=self._run_io)
io_handler.start()
# register commands and api methods
self._publish_methods()
poller = zmq.Poller()
poller.register(self._recv, zmq.POLLIN)
while True:
msgs = dict(poller.poll(500))
if msgs:
if msgs.get(self._recv) == zmq.POLLIN:
msg = self._recv.recv_string(flags=zmq.NOBLOCK)
else:
continue
try:
self.log.debug("got message:" + msg)
dot = msg.find(".")
ref = msg[:dot]
if (dot < len(msg) - 1) and (dot > 0):
self.log.debug("msg length: %d", len(msg))
self.log.debug("dot at %d", dot)
# TODO: maybe trim off the trailing "." for convenience
data = msg[dot + 1:]
else:
data = None
self.log.debug("data: " + str(data))
except IndexError as err:
self.log.warning("invalid message: %s", err)
continue
self.log.debug("got method: " + ref)
if (ref in self.commands) or (ref in self.api):
# TODO: cope with data for calls with no params
if ref in self.api:
if data:
data = json.loads(data)
ret = json.dumps(self.api[ref](data))
else:
ret = json.dumps(self.api[ref]())
else:
if data:
ret = self.commands[ref](data)
else:
ret = self.commands[ref]()
if ret is None:
ret = ""
ret = "0" + ret
self._recv.send_string(ret)
else:
self.log.warning("error! Command/API %s not found", ref)
self._recv.send_unicode("Error! Command/API not found.")
def main():
buro = Bureau()
buro.run()
if __name__ == "__main__":
main()