# bureau import functools import json import os.path import subprocess import tempfile import threading import time import zmq from mako.template import Template def update_commands(cls): for name, method in cls.__dict__.items(): print("name %s method %s" % (name, method)) if hasattr(method, "command"): comstr = method.command cls.commands[comstr] = method return cls def add_command(comstr, name=""): def decorator(func): @functools.wraps(func) def func_wrap(*args, **kwargs): return func(*args, **kwargs) func_wrap.command = comstr func_wrap.name = name return func_wrap return decorator def add_api(apistr, name=""): def decorator(func): @functools.wraps(func) def func_wrap(*args, **kwargs): return func(*args, **kwargs) func_wrap.api = apistr func_wrap.name = name return func_wrap return decorator class Bureau: name = "TEST" prefix = "00" version = 0 config = {} def __init__(self): """ set up ZeroMQ connections and register commands""" self.commands = {} self.api = {} if not os.path.exists(".screenless"): os.mkdir(".screenless") self.context = zmq.Context() self._recv = self.context.socket(zmq.REP) self._recv.bind("ipc://.screenless/" + self.prefix + ".ipc") print(("bureau " + self.name + " waiting for messages")) print("commands: ") print(self.commands) def send(self, recipient, message, data=None): message += "." if data: message += json.dumps(data) sender = self.context.socket(zmq.REQ) sender.connect("ipc://.screenless/" + recipient + ".ipc") sender.send_string(message) # TODO: retry this a few times with a proper sleep/timeout time.sleep(0.5) try: ret = json.loads(sender.recv(flags=zmq.NOBLOCK)) except zmq.ZMQError: print("message sent but got no reply...") ret = None return ret def _publish_methods(self): # register bureau with Inhuman Resources bureau_detail = {"name": self.name, "prefix": self.prefix, "desc": self.__doc__} self.send("IR", "addbureau", bureau_detail) # find and store all published methods for member in dir(self): method = getattr(self, member) # ignore anything that is not a method with command or api details if not (callable(method) and (hasattr(method, "command") or hasattr(method, "api"))): continue if hasattr(method, "command"): self.commands[method.command] = method cmd_detail = {"cmdname": method.name, "prefix": self.prefix, "cmd": method.command, "desc": method.__doc__} self.send("IR", "addcommand", cmd_detail) elif hasattr(method, "api"): self.api[method.api] = method api_detail = {"apiname": method.name, "prefix": self.prefix, "api": method.api, "desc": method.__doc__} self.send("IR", "addapi", api_detail) print("registered:") print(self.commands) print(self.api) def print_full(self, template, **kwargs): """print a full page (A4) document """ # TODO: look up the printer LPR name lpname = kwargs.get("printer", "default") templ = Template(filename=template) texfile, texfilepath = tempfile.mkstemp(".tex") texfile.write(templ.render_unicode( **kwargs).encode('utf-8', 'replace')) texdir = os.path.dirname(texfilepath) subprocess.call("cd " + texdir + "; xelatex " + texfilepath) pdffile = texfilepath[0:-4] + ".pdf" subprocess.call("lpr -P " + lpname + " " + pdffile) def print_small(self, text, printer="/dev/usb/lp1"): lp = open(printer, "w") text += "\r\n" * 10 text += ".d0" lp.write(text) lp.close() @add_command("test") def test(self, data): # stupid test to see if modules work print(("hi! testing. " + self.name + " bureau seems to work!")) return b"seems to work." def run_io(self): """process hardware or timed input This method can be ignored for most Bureaus. It should be overloaded for any services that need to independently generate messages as this is just a placeholder. It is run in a separate thread and should handle any input or timed events. Messages are then sent to other Bureaus via the self.send connection to the OfficeManager. Don't forget to to consier thread safety issues when accessing data! """ pass def run(self): """main loop for processing messages This runs all relelvant event processing loops. """ # start the hardware input handler io = threading.Thread(target=self.run_io) io.start() # register commands and api methods self._publish_methods() while True: try: msg = self._recv.recv_string(flags=zmq.NOBLOCK) except zmq.ZMQError: time.sleep(0.05) # don't waste CPU continue try: dot = msg.find(".") ref = msg[:dot] if (dot < len(msg) - 1) and (dot > 0): data = msg[dot + 1:] else: data = None print("data: " + str(data)) except IndexError as e: print("invalid message: ", e) continue print(("got method: " + ref)) if (ref in self.commands) or (ref in self.api): # catch TypeErrors for case of bogus params try: if ref in self.api: data = json.loads(data) ret = self.api[ref](data) else: ret = self.commands[ref](data) if ret is None: ret = "" ret = b"0" + ret self._recv.send(ret) except TypeError as e: print(e) print("invalid data for command '{}': {}".format(ref, data)) self._recv.send_unicode("Error. Invalid or missing data.") except KeyError as e: print(e) print("You are calling a command as an API or vice-versa.") self._recv.send_unicode( "Error. Command called as API or API as command.") else: print("error! Command/API %s not found", ref) self._recv.send_unicode("Error! Command/API not found.") if __name__ == "__main__": test = Bureau() test.run()