From 88b9d4801530ac5a14ca6c1f852e24251f1efdbc Mon Sep 17 00:00:00 2001 From: Brendan Howell Date: Sat, 13 Feb 2016 00:10:26 +0100 Subject: [PATCH] fixed ipc problems. added some error response handlers/messages. --- screenless/bureau/bureau.py | 89 +++++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 34 deletions(-) diff --git a/screenless/bureau/bureau.py b/screenless/bureau/bureau.py index 60bd508..8ebccd4 100644 --- a/screenless/bureau/bureau.py +++ b/screenless/bureau/bureau.py @@ -5,6 +5,7 @@ import os.path import subprocess import tempfile import threading +import time import zmq from mako.template import Template @@ -41,7 +42,6 @@ def add_api(apistr, name=""): return decorator -# @update_commands class Bureau: name = "TEST" prefix = "00" @@ -53,31 +53,38 @@ class Bureau: """ set up ZeroMQ connections and register commands""" self.commands = {} self.api = {} - self.context = zmq.Context() - self._recv = self.context.socket(zmq.SUB) - self._recv.connect("tcp://localhost:10101") - self._recv.setsockopt_string(zmq.SUBSCRIBE, self.prefix) - print(("bureau " + self.name + " waiting for messages")) - self._send = self.context.socket(zmq.PUB) - self._send.connect("tcp://localhost:10100") - - # update_commands(self) - # self.registerCommands() + if not os.path.exists(".screenless"): + os.mkdir(".screenless") + self.context = zmq.Context() + self._recv = self.context.socket(zmq.REP) + self._recv.bind("ipc://.screenless/" + self.prefix + ".ipc") + print(("bureau " + self.name + " waiting for messages")) print("commands: ") print(self.commands) - def send(self, message, data=None): + def send(self, recipient, message, data=None): + message += "." if data: message += json.dumps(data) - self._send.send_string(message) + sender = self.context.socket(zmq.REQ) + sender.connect("ipc://.screenless/" + recipient + ".ipc") + sender.send_string(message) + # TODO: retry this a few times with a proper sleep/timeout + time.sleep(0.5) + try: + ret = json.loads(sender.recv(flags=zmq.NOBLOCK)) + except zmq.ZMQError: + print("message sent but got no reply...") + ret = None + return ret def _publish_methods(self): # register bureau with Inhuman Resources - bureau_json = json.dumps({"name": self.name, "prefix": self.prefix, - "desc": self.__doc__}) - self.send("IRaddbureau." + bureau_json) + bureau_detail = {"name": self.name, "prefix": self.prefix, + "desc": self.__doc__} + self.send("IR", "addbureau", bureau_detail) # find and store all published methods for member in dir(self): @@ -88,18 +95,18 @@ class Bureau: continue if hasattr(method, "command"): self.commands[method.command] = method - cmd_json = json.dumps({"cmdname": method.name, - "prefix": self.prefix, - "cmd": method.command, - "desc": method.__doc__}) - self.send("IRaddcommand." + cmd_json) + cmd_detail = {"cmdname": method.name, + "prefix": self.prefix, + "cmd": method.command, + "desc": method.__doc__} + self.send("IR", "addcommand", cmd_detail) elif hasattr(method, "api"): self.api[method.api] = method - cmd_json = json.dumps({"apiname": method.name, - "prefix": self.prefix, - "api": method.api, - "desc": method.__doc__}) - self.send("IRaddapi." + cmd_json) + api_detail = {"apiname": method.name, + "prefix": self.prefix, + "api": method.api, + "desc": method.__doc__} + self.send("IR", "addapi", api_detail) print("registered:") print(self.commands) @@ -132,9 +139,10 @@ class Bureau: lp.close() @add_command("test") - def test(self): + def test(self, data): # stupid test to see if modules work print(("hi! testing. " + self.name + " bureau seems to work!")) + return b"seems to work." def run_io(self): """process hardware or timed input @@ -163,12 +171,16 @@ class Bureau: self._publish_methods() while True: - msg = self._recv.recv_string() + try: + msg = self._recv.recv_string(flags=zmq.NOBLOCK) + except zmq.ZMQError: + time.sleep(0.05) # don't waste CPU + continue try: dot = msg.find(".") - ref = msg[2:dot] + ref = msg[:dot] if (dot < len(msg) - 1) and (dot > 0): - data = json.loads(msg[dot + 1:]) + data = msg[dot + 1:] else: data = None print("data: " + str(data)) @@ -180,20 +192,29 @@ class Bureau: if (ref in self.commands) or (ref in self.api): # catch TypeErrors for case of bogus params try: - if data: - self.api[ref](data) + if ref in self.api: + data = json.loads(data) + ret = self.api[ref](data) else: - self.commands[ref]() + ret = self.commands[ref](data) + if ret is None: + ret = "" + ret = b"0" + ret + self._recv.send(ret) except TypeError as e: print(e) print("invalid data for command '{}': {}".format(ref, data)) + self._recv.send_unicode("Error. Invalid or missing data.") except KeyError as e: print(e) print("You are calling a command as an API or vice-versa.") + self._recv.send_unicode( + "Error. Command called as API or API as command.") else: print("error! Command/API %s not found", ref) + self._recv.send_unicode("Error! Command/API not found.") if __name__ == "__main__": - test = Bureau("test", "00") + test = Bureau() test.run()