Merge branch 'master' of gitlab.com:bhowell/the-screenless-office

workspace
Brendan Howell 6 years ago
commit 73d056561f

@ -1,13 +1,13 @@
import sys
from . import officemgr
from . import mgmt
def main(args=None):
if args is None:
args = sys.argv[1:]
print("starting Screenless Office...")
mgr = officemgr.OfficeManager()
print("initializing Screenless Office...")
mgr = mgmt.Management()
print("starting Office Manager...")
print("starting Management...")
mgr.run()

@ -1,4 +1,4 @@
import subprocess
import vlc
from bureau import Bureau, add_command
@ -17,7 +17,7 @@ class Audio(Bureau):
Bureau.__init__(self)
self.urldb = self.open_db("urldb")
subprocess.call(["mocp", "-S"])
self.player = vlc.MediaPlayer()
@add_command("p", "Play an album, track or a live stream.")
def play(self, data):
@ -31,44 +31,58 @@ class Audio(Bureau):
url = self.urldb.get(shortcode)
self.log.debug(" playing url " + url)
subprocess.call(["mocp", "-c"])
subprocess.call(["mocp", "-a", url])
subprocess.call(["mocp", "-p"])
self.player.set_mrl(url)
self.player.play()
@add_command("stop", "Halt audio playback.")
def stop(self):
"""
Stops all audio currently playing audio output.
"""
subprocess.call(["mocp", "-P"])
self.player.pause()
@add_command("resu", "Resume playback.")
def resume(self):
"""
Resume playback of paused audio.
"""
subprocess.call(["mocp", "-U"])
self.player.play()
@add_command("next", "Play the next song.")
def play_next(self):
"""
Skip to the next song in the playlist or album.
"""
subprocess.call(["mocp", "-f"])
#subprocess.call(["mocp", "-f"])
# TODO
pass
@add_command("prev", "Play the previous song.")
def play_prev(self):
"""
Skip to the previous song in the playlist or album.
"""
subprocess.call(["mocp", "-r"])
#subprocess.call(["mocp", "-r"])
# TODO
pass
@add_command("nowp", "Now Playing")
def now_playing(self):
"""
Prints the currently playing song or stream on the small printer.
"""
out = subprocess.check_output(["mocp", "-i"]).decode("utf-8")
#out = subprocess.check_output(["mocp", "-i"]).decode("utf-8")
# TODO: sort out how to do this with
out = "Now Playing: "
out += self.player.get_media().get_meta(vlc.Meta.Title) + "\n"
nowplaying = self.player.get_media().get_meta(vlc.Meta.NowPlaying)
if nowplaying == "":
out += "by " + self.player.get_media().get_meta(vlc.Meta.Title) + "\n"
out += "from the album '" + self.player.get_media().get_meta(vlc.Meta.Album) \
+ "'\n"
else:
out += nowplaying + "\n"
self.log.debug("info output:" + out)
self.print_small(out)

@ -7,9 +7,9 @@ import os.path
import random
import string
import subprocess
import sys
import tempfile
import textwrap
#import traceback
import threading
import lmdb
@ -60,6 +60,19 @@ def add_api(apistr, name=""):
return decorator
#def log_traceback(func):
# """ this is a decorator that catches tracebacks for logging"""
# def wrapper(*args):
# my_bureau = args[0]
# try:
# func(*args)
# except Exception as e:
# my_bureau.log.error("CRASH TRACE: {0}".format(my_bureau.name),
# exc_info=e)
# raise
# return wrapper
class LogPrinter(logging.Handler):
"""
LogPrinter prints logs on a receipt printer for screenless debugging.
@ -151,11 +164,6 @@ class Bureau(object):
self.api = {}
modpath = os.path.dirname(__file__)
#slimerjs = os.path.join(modpath, "..", "lib", "slimerjs", "slimerjs")
#renderer = os.path.join(modpath, "..", "slimerjs", "rasterize.js")
#self.slimerjs = os.path.abspath(slimerjs)
#self.html2pdf = self.slimerjs + " --headless " + \
# os.path.abspath(renderer) + " "
mypath = inspect.getfile(self.__class__)
self.mdir = os.path.dirname(mypath)
@ -181,7 +189,6 @@ class Bureau(object):
log_format = logging.Formatter('LOG ${levelname} $name: $message', style='$')
log_printer.setFormatter(log_format)
self.log.addHandler(log_printer)
sys.excepthook = self._log_exception
# setup a dir to store files and data
self.datadir = os.path.join(basepath, self.prefix)
@ -200,9 +207,9 @@ class Bureau(object):
self.log.debug("commands: ")
self.log.debug(str(self.commands))
def _log_exception(typ, value, tb):
self.log.error("CRASH TRACE: {0}".format(str(value)), exc_info=(typ, value, tb))
sys.__excepthook__(typ, value, tb)
# def _log_exception(typ, value, tb):
# self.log.error("CRASH TRACE: {0}".format(str(value)), exc_info=(typ, value, tb))
# sys.__excepthook__(typ, value, tb)
def open_db(self, name):
"""
@ -346,10 +353,15 @@ class Bureau(object):
"""
# TODO: make printer id/width configured and easy
prn = printer.Usb(0x416, 0x5011, in_ep=0x81, out_ep=0x03)
im = PIL.Image.open(img)
if type(img) is PIL.Image.Image:
im = img
else:
im = PIL.Image.open(img)
# NOTE: might be worth tring to push up brightness
im = PIL.ImageOps.equalize(im) # stretch histogram for nicer dither
im.thumbnail((576, 576), PIL.Image.ANTIALIAS) # resize to fit printer
im.thumbnail((576, 1024), PIL.Image.ANTIALIAS) # resize to fit printer
prn.image(im, impl="bitImageColumn") # not using this impl crashes ??
@add_command("test")
@ -361,6 +373,16 @@ class Bureau(object):
print(("hi! testing. " + self.name + " bureau seems to work!"))
return "seems to work."
def _run_io(self):
"""
wrapper for run_io so that we can catch threaded exceptions and log
"""
try:
self.run_io()
except Exception as err:
self.log.exception("%s CRASHED with %s\n", self.name, err)
raise
def run_io(self):
"""process hardware or timed input
@ -375,6 +397,16 @@ class Bureau(object):
pass
def run(self):
"""
wrapper running the main loop and logging all exceptions
"""
try:
self._run()
except Exception as err:
self.log.exception("%s CRASHED with %s\n", self.name, err)
raise
def _run(self):
"""
main loop for processing messages
@ -382,7 +414,7 @@ class Bureau(object):
"""
# start the hardware input handler
io_handler = threading.Thread(target=self.run_io)
io_handler = threading.Thread(target=self._run_io)
io_handler.start()
# register commands and api methods
@ -398,10 +430,6 @@ class Bureau(object):
msg = self._recv.recv_string(flags=zmq.NOBLOCK)
else:
continue
# msg = self._recv.recv_string(flags=zmq.NOBLOCK)
#except zmq.ZMQError:
# time.sleep(0.05) # don't waste CPU
# continue
try:
self.log.debug("got message:" + msg)
dot = msg.find(".")
@ -411,7 +439,6 @@ class Bureau(object):
self.log.debug("dot at %d", dot)
# TODO: maybe trim off the trailing "." for convenience
data = msg[dot + 1:]
# data = str(data) # force to be a string
else:
data = None
self.log.debug("data: " + str(data))

@ -1,6 +1,13 @@
import glob
import math
import mimetypes
import os
import subprocess
import urllib
from bureau import Bureau, add_command
from PIL import Image
from bureau import Bureau, add_command, add_api
class Humor(Bureau):
@ -24,6 +31,59 @@ class Humor(Bureau):
jux = str(subprocess.check_output("/usr/games/fortune"), encoding="UTF-8")
self.print_small(jux)
@add_api("gif", "Moving Picture")
def print_gif(self, data):
"""
Prints out a series of image frames which can be viewed in lively
motion on any standard zoetrope. (Ø200mm)
"""
# download the video file
d_url = data["url"]
filename, headers = urllib.request.urlretrieve(d_url)
print("fetching", d_url, filename)
# make sure we have a legit filename
ext = mimetypes.guess_extension(headers["Content-Type"])
if not filename.endswith(ext):
os.rename(filename, filename + ext)
filename = filename + ext
print("renamed to ", filename)
# if we have something that's a gif or webp (png?) then
# just print 12 frames
if filename.endswith(("gif", "webp")):
print("gif stuff")
img = Image.open(filename)
print(img.n_frames)
grab_frame = 0
out_frame = 0
in_len = float(img.n_frames)
# TODO: deal with frame counts lower than 12
# and maybe a different algo that includes endpoints
for frame in range(img.n_frames):
img.seek(frame)
if grab_frame == frame:
img_rotated = img.rotate(90, expand=True)
self.print_small_image(img_rotated)
out_frame += 1
grab_frame = math.ceil(out_frame * in_len / 12)
else:
# how many frames do we have?
cli = "ffprobe -i " + filename + \
" -show_format -v quiet | sed -n 's/duration=//p'"
vid_len = str(subprocess.check_output(cli), encoding="UTF-8")
print("video len: " + vid_len)
# TODO: if vid_len is not a number handle this error
# lengthen/shorten it to 12 frames
# dump frames to temp files
cli = "ffmpeg -i" + filename + " -vf fps=12/" + vid_len +\
" thumb%02d.png"
# print em out!
self.print_small("")
def main():
ha = Humor()

@ -49,6 +49,10 @@
width: 50%;
height: 50%;
}
.endnotebc {
width: 50%;
height: 50%;
}
</style>
<body>
<h1>${title}</h1>

@ -175,7 +175,7 @@ class Publications(Bureau):
#TODO: make this barcode inline thing a util method
encoded_svg = b64encode(bytes(svg, 'utf-8')).decode()
encoded_data = "data:image/svg+xml;charset=utf-8;base64," + encoded_svg
svg = '<img src="%s"/>' % encoded_data
svg = '<img class="endnotebc" src="%s"/>' % encoded_data
footnote.append(lxml.html.fromstring(svg))
html.append(footnote)

@ -292,6 +292,8 @@ class PublicRelations(Bureau):
prn.codepage = "cp437"
# TODO: add fancier formatting i.e. inverted text for username/handle
# TODO: clean this up to use the built in formatting from escpos lib
# and make a print_status function to use for notifications too
toots = self.masto.timeline(limit=count)
out = ""
for t in toots:
@ -324,8 +326,20 @@ class PublicRelations(Bureau):
tw_shortcode = self.short_tweet_id(str(t["id"]))
prn.barcode("PRmad." + tw_shortcode, "CODE128", function_type="B")
prn.text("\r\n\r\n")
notifications = self.masto.notifications()
if len(notifications) > 0:
prn.set(text_type="B")
prn.text("NOTIFICATIONS:\r\n")
prn.set(text_type="NORMAL")
for note in notifications:
username = t.account.display_name.encode("cp437", "ignore") + \
b" (" + t.account.acct.encode("cp437", "ignore") + b")"
prn.text(note["type"] + " " + str(note["created_at"]) + " from ")
prn._raw(username)
prn.text(":\r\n" + str(note.keys()) + "\r\n")
prn.text("\r\n\r\n")
prn.cut()

@ -22,7 +22,7 @@ class Sales(Bureau):
pass # we've already got a db folder
@add_command("p", "Play Media")
def print_fortune(self, data):
def play_media(self, data):
"""
Shows media on a connected projector or plays audio.
"""

@ -67,6 +67,7 @@ class TypingPool(Bureau):
def run_io(self):
val = ""
upper = False
ctrl = False
#TODO: this is crap, needs to be multi-threaded and have one
# such loop for each active device
self.active_devices[0].grab()
@ -74,7 +75,7 @@ class TypingPool(Bureau):
for ev in self.active_devices[0].read_loop():
if ev.type == evdev.ecodes.EV_KEY:
data = evdev.categorize(ev)
if data.keystate == 1:
if data.keystate == 1: # key-down
if data.scancode == 28:
print("sending barcode:", val)
self.send(val[0:2], val[2:])
@ -82,15 +83,27 @@ class TypingPool(Bureau):
else:
try:
new_key = KEYS[data.scancode]
if new_key == "LSHFT" or new_key == "RSHFT":
if ctrl and (new_key == "j"):
self.log.debug("ignoring line-feed")
elif new_key == "LSHFT" or new_key == "RSHFT":
upper = True
elif new_key == "LCTRL" or new_key == "RCTRL":
ctrl = True
else:
if upper:
new_key = new_key.upper()
upper = False
val += new_key
except KeyError:
print("Error invalid keycode:", data.scancode)
if data.keystate == 0: # key-up for mod-keys
try:
new_key = KEYS[data.scancode]
if new_key == "LSHFT" or new_key == "RSHFT":
upper = False
if new_key == "LCTRL" or new_key == "RCTRL":
ctrl = False
except KeyError:
print("Error invalid keycode:", data.scancode)
def main():

Loading…
Cancel
Save