fixed ipc problems. added some error response handlers/messages.

workspace
Brendan Howell 9 years ago
parent b75211f42f
commit 88b9d48015

@ -5,6 +5,7 @@ import os.path
import subprocess import subprocess
import tempfile import tempfile
import threading import threading
import time
import zmq import zmq
from mako.template import Template from mako.template import Template
@ -41,7 +42,6 @@ def add_api(apistr, name=""):
return decorator return decorator
# @update_commands
class Bureau: class Bureau:
name = "TEST" name = "TEST"
prefix = "00" prefix = "00"
@ -53,31 +53,38 @@ class Bureau:
""" set up ZeroMQ connections and register commands""" """ set up ZeroMQ connections and register commands"""
self.commands = {} self.commands = {}
self.api = {} self.api = {}
self.context = zmq.Context()
self._recv = self.context.socket(zmq.SUB)
self._recv.connect("tcp://localhost:10101")
self._recv.setsockopt_string(zmq.SUBSCRIBE, self.prefix)
print(("bureau " + self.name + " waiting for messages"))
self._send = self.context.socket(zmq.PUB) if not os.path.exists(".screenless"):
self._send.connect("tcp://localhost:10100") os.mkdir(".screenless")
# update_commands(self)
# self.registerCommands()
self.context = zmq.Context()
self._recv = self.context.socket(zmq.REP)
self._recv.bind("ipc://.screenless/" + self.prefix + ".ipc")
print(("bureau " + self.name + " waiting for messages"))
print("commands: ") print("commands: ")
print(self.commands) print(self.commands)
def send(self, message, data=None): def send(self, recipient, message, data=None):
message += "."
if data: if data:
message += json.dumps(data) message += json.dumps(data)
self._send.send_string(message) sender = self.context.socket(zmq.REQ)
sender.connect("ipc://.screenless/" + recipient + ".ipc")
sender.send_string(message)
# TODO: retry this a few times with a proper sleep/timeout
time.sleep(0.5)
try:
ret = json.loads(sender.recv(flags=zmq.NOBLOCK))
except zmq.ZMQError:
print("message sent but got no reply...")
ret = None
return ret
def _publish_methods(self): def _publish_methods(self):
# register bureau with Inhuman Resources # register bureau with Inhuman Resources
bureau_json = json.dumps({"name": self.name, "prefix": self.prefix, bureau_detail = {"name": self.name, "prefix": self.prefix,
"desc": self.__doc__}) "desc": self.__doc__}
self.send("IRaddbureau." + bureau_json) self.send("IR", "addbureau", bureau_detail)
# find and store all published methods # find and store all published methods
for member in dir(self): for member in dir(self):
@ -88,18 +95,18 @@ class Bureau:
continue continue
if hasattr(method, "command"): if hasattr(method, "command"):
self.commands[method.command] = method self.commands[method.command] = method
cmd_json = json.dumps({"cmdname": method.name, cmd_detail = {"cmdname": method.name,
"prefix": self.prefix, "prefix": self.prefix,
"cmd": method.command, "cmd": method.command,
"desc": method.__doc__}) "desc": method.__doc__}
self.send("IRaddcommand." + cmd_json) self.send("IR", "addcommand", cmd_detail)
elif hasattr(method, "api"): elif hasattr(method, "api"):
self.api[method.api] = method self.api[method.api] = method
cmd_json = json.dumps({"apiname": method.name, api_detail = {"apiname": method.name,
"prefix": self.prefix, "prefix": self.prefix,
"api": method.api, "api": method.api,
"desc": method.__doc__}) "desc": method.__doc__}
self.send("IRaddapi." + cmd_json) self.send("IR", "addapi", api_detail)
print("registered:") print("registered:")
print(self.commands) print(self.commands)
@ -132,9 +139,10 @@ class Bureau:
lp.close() lp.close()
@add_command("test") @add_command("test")
def test(self): def test(self, data):
# stupid test to see if modules work # stupid test to see if modules work
print(("hi! testing. " + self.name + " bureau seems to work!")) print(("hi! testing. " + self.name + " bureau seems to work!"))
return b"seems to work."
def run_io(self): def run_io(self):
"""process hardware or timed input """process hardware or timed input
@ -163,12 +171,16 @@ class Bureau:
self._publish_methods() self._publish_methods()
while True: while True:
msg = self._recv.recv_string() try:
msg = self._recv.recv_string(flags=zmq.NOBLOCK)
except zmq.ZMQError:
time.sleep(0.05) # don't waste CPU
continue
try: try:
dot = msg.find(".") dot = msg.find(".")
ref = msg[2:dot] ref = msg[:dot]
if (dot < len(msg) - 1) and (dot > 0): if (dot < len(msg) - 1) and (dot > 0):
data = json.loads(msg[dot + 1:]) data = msg[dot + 1:]
else: else:
data = None data = None
print("data: " + str(data)) print("data: " + str(data))
@ -180,20 +192,29 @@ class Bureau:
if (ref in self.commands) or (ref in self.api): if (ref in self.commands) or (ref in self.api):
# catch TypeErrors for case of bogus params # catch TypeErrors for case of bogus params
try: try:
if data: if ref in self.api:
self.api[ref](data) data = json.loads(data)
ret = self.api[ref](data)
else: else:
self.commands[ref]() ret = self.commands[ref](data)
if ret is None:
ret = ""
ret = b"0" + ret
self._recv.send(ret)
except TypeError as e: except TypeError as e:
print(e) print(e)
print("invalid data for command '{}': {}".format(ref, data)) print("invalid data for command '{}': {}".format(ref, data))
self._recv.send_unicode("Error. Invalid or missing data.")
except KeyError as e: except KeyError as e:
print(e) print(e)
print("You are calling a command as an API or vice-versa.") print("You are calling a command as an API or vice-versa.")
self._recv.send_unicode(
"Error. Command called as API or API as command.")
else: else:
print("error! Command/API %s not found", ref) print("error! Command/API %s not found", ref)
self._recv.send_unicode("Error! Command/API not found.")
if __name__ == "__main__": if __name__ == "__main__":
test = Bureau("test", "00") test = Bureau()
test.run() test.run()

Loading…
Cancel
Save