import codecs import html import io import os.path import random import textwrap import string import urllib import bleach from escpos import printer import facebook from mastodon import Mastodon import PIL import requests import twitter from bureau import Bureau, add_command, add_api class TWrapper(): pass class PublicRelations(Bureau): """ The Public relations department manages the flow of information between the screenless office and the general public. It provides interfaces for Twitter, Facebook, Mastodon and other electronic PR platforms. """ name = "Public Relations" prefix = "PR" version = 0 def __init__(self): Bureau.__init__(self) CK = codecs.decode("neV4HPasZrXMjNaliWWVUIaHA", "rot13") CS = codecs.decode("ntegmiu3rdFAyMczCWM0bgpydFHwYXV3WedhtPCec1Pu9qdfGy", "rot13") TW_CREDS = os.path.expanduser('~/.screenless/tw_creds') if not os.path.exists(TW_CREDS): twitter.oauth_dance("The Screenless Office", CK, CS, TW_CREDS) oauth_token, oauth_secret = twitter.read_token_file(TW_CREDS) self.t = TWrapper() self.auth = twitter.OAuth(oauth_token, oauth_secret, CK, CS) self.t.t = twitter.Twitter(auth=self.auth) try: MASTO_CREDS = os.path.expanduser('~/.screenless/masto_creds') masto_server = self.config["mastodon"]["server"] if not os.path.exists(MASTO_CREDS): Mastodon.create_app('screenless', api_base_url=masto_server, to_file=MASTO_CREDS) # TODO: catch the error when our token is too old and throw it out self.masto = Mastodon(client_id=MASTO_CREDS, api_base_url=masto_server) masto_user = self.config["mastodon"]["user"] masto_pass = self.config["mastodon"]["password"] self.masto.log_in(masto_user, masto_pass) except KeyError: print("no mastodon config found.") print("you can add a 'mastodon:' section to PR.yml with:") print(" server: my.server.name") print(" user: myuser") print(" password: mypassword") print("skipping masto login for now...") # setup DBs to map short codes to tweet ids self.tweetdb = self.dbenv.open_db(b"tweetdb") with self.dbenv.begin(db=self.tweetdb) as txn: self.last_mast_notif = txn.get(b"last_mast_notif") def get_tweet_id(self, tweet_hash): """ take a short code and look up the tweet/toot id """ with self.dbenv.begin(db=self.tweetdb) as txn: tweetid = txn.get(tweet_hash.encode()) if tweetid: return int(tweetid) else: return tweetid def short_tweet_id(self, tweet_id): """ take a tweet id and return a short alphanumeric code """ shortcode = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) with self.dbenv.begin(db=self.tweetdb, write=True) as txn: txn.put(shortcode.encode(), tweet_id.encode()) return shortcode @add_command("tweetpic", "Post a Document Camera Image to Twitter") def tweet_pic(self, reply_to_id=None, at_user=None): """ Takes a photograph using the document camera and posts it to Twitter. """ photo = self.send("PX", "photo")["photo"] with open(photo, "rb") as imagefile: imagedata = imagefile.read() t_up = twitter.Twitter(domain='upload.twitter.com', auth=self.auth) id_img1 = t_up.media.upload(media=imagedata)["media_id_string"] if reply_to_id: if at_user: status = "@" + at_user else: status = " " self.t.t.statuses.update(status=status, in_reply_to_status_id=reply_to_id, media_ids=id_img1) else: self.t.t.statuses.update(status="#screenless", media_ids=id_img1) @add_command("fbpost", "Post to Facebook") def post_fb(self): """ Takes a photograph using the document camera and posts it to Facebook. """ photo = self.send("PX", "photo")["photo"] access_token = 'EAADixisn70ABADh2rEMZAYA8nGzd6ah8RFZA3URba263aCQ63ajLeTiZC5sgZCyIVSmRZBWReVsO9IuaLibX5RjW9Ja2tTZAbxgrDr1dPJzyGwcGTSV9bW1W4NigN0d9dFIH35W2fZBOkhvuLqOCDCBacIPjXPMxF7DRGyrz5lVHxTc04OlBeRX' page_id = "screenless" graph = facebook.GraphAPI(access_token) print("uploading photo " + photo) graph.put_photo(image=open(photo, 'rb'), album_path=page_id + "/photos", message='#screenless') @add_command("twtimeline", "Print Recent Tweets") def tw_timeline(self, data=None): """ Print some recent tweets from your home timeline. Default is 10. """ if data: try: count = data["count"] except KeyError as err: print("You need to specify how many tweets you want!") else: count = 10 prn = self._get_small_printer() # TODO: abstract this to use a simple templating system instead of raw tweets = self.t.t.statuses.home_timeline(count=count, tweet_mode="extended") out = "" for t in tweets: prn.set(underline=1) username = t["user"]["name"] prn.textln(username) prn.set() if "retweeted_status" in t: rt = t["retweeted_status"] twtext = "RT from " + rt["user"]["name"] + "\n" twtext += html.unescape(rt["full_text"]) else: twtext = html.unescape(t["full_text"]) prn.block_text(twtext, font="0") prn.ln() if "media" in t["entities"]: if len(t["entities"]["media"]) > 0: i_url = t["entities"]["media"][0]["media_url"] filename = i_url.rsplit('/',1)[1] filename = "/tmp/" + filename print("fetching", i_url, filename) urllib.request.urlretrieve(i_url, filename) im = PIL.Image.open(filename) if im.mode in ("L", "RGB", "P"): im = PIL.ImageOps.equalize(im) im.thumbnail((self.smprint["width"], 960), PIL.Image.ANTIALIAS) prn.image(im, impl="bitImageColumn") tw_shortcode = self.short_tweet_id(t["id_str"]) #prn.barcode("PRtwd." + tw_shortcode, "CODE128", function_type="B") #TODO: submit a patch to escpos to have a quiet_zone parameter # as the default is a bit wide for 58mm printers prn.soft_barcode("code128", "PRtwd." + tw_shortcode, module_width=0.16) prn.ln(2) prn.cut() prn.close() @add_command("twd", "Print Tweet Details") def tw_details(self, data): """ Print detailed tweet info and commands for reply, like, retweet, etc. """ shortcode, _ = data.split(".") tweet_id = self.get_tweet_id(shortcode) tweet = self.t.t.statuses.show(id=tweet_id, tweet_mode="extended") prn = self._get_small_printer() prn.set(underline=1) username = tweet["user"]["name"] prn.textln(username) prn.set() if "retweeted_status" in tweet: rt = tweet["retweeted_status"] twtext = "RT from " + rt["user"]["name"] + "\n" twtext += html.unescape(rt["full_text"]) else: twtext = html.unescape(tweet["full_text"]) twtext += "\n" prn.block_text(twtext, font="0") if "media" in tweet["entities"]: for entity in tweet["entities"]["media"]: i_url = entity["media_url"] filename = i_url.rsplit('/',1)[1] filename = "/tmp/" + filename print("fetching", i_url, filename) urllib.request.urlretrieve(i_url, filename) im = PIL.Image.open(filename) if im.mode in ("L", "RGB", "P"): im = PIL.ImageOps.equalize(im) im.thumbnail((self.smprint["width"], 576), PIL.Image.ANTIALIAS) prn.image(im, impl="bitImageColumn") tw_shortcode = self.short_tweet_id(tweet["id_str"]) prn.text("Retweet\r\n") #prn.barcode("PRtwrt." + tw_shortcode, "CODE128", function_type="B") prn.soft_barcode("code128", "PRtwrt." + tw_shortcode, module_width=0.16) prn.text("Like\r\n") #prn.barcode("PRtwlk." + tw_shortcode, "CODE128", function_type="B") prn.soft_barcode("code128", "PRtwlk." + tw_shortcode, module_width=0.16) prn.text("Reply\r\n") #prn.barcode("PRtwre." + tw_shortcode, "CODE128", function_type="B") prn.soft_barcode("code128", "PRtwre." + tw_shortcode, module_width=0.16) prn.ln(2) prn.cut() prn.close() @add_command("twrt", "Re-Tweet") def tw_retweet(self, data): """ Re-Tweet a tweet from someone else. """ shortcode, _ = data.split(".") tweet_id = self.get_tweet_id(shortcode) self.t.t.statuses.retweet(id=tweet_id) @add_command("twre", "Reply to Tweet") def tw_reply(self, data): """ Reply to a tweet. """ shortcode, _ = data.split(".") tweet_id = self.get_tweet_id(shortcode) tweet = self.t.t.statuses.show(id=tweet_id) at_user = tweet["user"]["screen_name"] self.tweet_pic(tweet_id=tweet_id, at_user=at_user) @add_command("twlk", "Like a Tweet") def tw_like(self, data): """ 'Like' a tweet. """ shortcode, _ = data.split(".") tweet_id = self.get_tweet_id(shortcode) self.t.t.favorites.create(_id=tweet_id) @add_command("mare", "Boost a toot") def boost_toot(self, data): """ Boost a toot (or whatever kind of Fediverse content) """ shortcode, _ = data.split(".") toot_id = self.get_tweet_id(shortcode) self.masto.status_reblog(toot_id) @add_command("mafv", "Favorite a toot") def fav_toot(self, data): """ favorite a toot (or other kind of Fediverse content) """ shortcode, _ = data.split(".") toot_id = self.get_tweet_id(shortcode) self.masto.status_favorite(toot_id) @add_command("marp", "Reply to a toot") def reply_toot(self, data): """ Reply to a toot with the image under the document camera. """ shortcode, _ = data.split(".") toot_id = self.get_tweet_id(shortcode) self.toot_pic(reply_to=toot_id) @add_command("toot", "Post a Document Camera Image to the Fediverse") def toot_pic(self, reply_to=None): """ Takes a photograph using the document camera and posts it to the Fediverse (Mastodon, Pleroma, etc.) """ # TODO: maybe add @ocrbot mention or even better run OCR locally # and use that as status and description photo = self.send("PX", "photo")["photo"] media = self.masto.media_post(photo) post = self.masto.status_post("", in_reply_to_id=reply_to, media_ids=[media]) @add_command("mad", "Print Detailed Toot") def showtoot(self, data): """ Prints out a detailed version of a fediverse post with all media. Allows various social and public relations management. """ shortcode, _ = data.split(".") toot_id = self.get_tweet_id(shortcode) t = self.masto.status(toot_id) prn = self._get_small_printer() prn.set(underline=1) print("toot from:" + str(t.account)) username = str(t.account.display_name) acct = str(t.account.acct) prn.textln(username + " (" + acct + ")") prn.set() ttext = bleach.clean(t.content, tags=[], strip=True) ttext = html.unescape(ttext) prn.block_text(ttext, font="0") prn.ln() for media in t.media_attachments: if media.type == "image": req_data = requests.get(media.url) im = PIL.Image.open(io.BytesIO(req_data.content)) if im.mode in ("L", "RGB", "P"): im = PIL.ImageOps.equalize(im) im.thumbnail((self.smprint["width"], 960), PIL.Image.ANTIALIAS) prn.image(im, impl="bitImageColumn") prn.textln("Boost") prn.soft_barcode("code128", "PRmare." + shortcode, module_width=0.16) prn.textln("Favorite") prn.soft_barcode("code128", "PRmafv." + shortcode, module_width=0.16) prn.textln("Reply") prn.soft_barcode("code128", "PRmad." + shortcode, module_width=0.16) prn.ln(2) prn.cut() prn.close() @add_command("tootline", "Print Recent Toots") def tootline(self, data=None): """ Print some recent entries from your Fediverse timeline. Default is 10. """ if data: try: count = data["count"] except KeyError as err: print("You need to specify how many toots you want!") else: count = 10 prn = self._get_small_printer() # TODO: add fancier formatting i.e. inverted text for username/handle # TODO: clean this up to use the built in formatting from escpos lib # and make a print_status function to use for notifications too self.masto.debug_requests = True toots = self.masto.timeline() self.masto.debug_requests = False out = "" for t in toots: prn.set(underline=1) username = str(t.account.display_name) acct = str(t.account.acct) prn.textln(username + " (" + acct + ")") prn.set() ttext = bleach.clean(t.content, tags=[], strip=True) ttext = html.unescape(ttext) prn.block_text(ttext, font="0") prn.ln() if len(t.media_attachments) > 0: img = None t.media_attachments.reverse() for media in t.media_attachments: if media.type == "image": img = media if img: req_data = requests.get(img.url) im = PIL.Image.open(io.BytesIO(req_data.content)) if im.mode in ("L", "RGB", "P"): im = PIL.ImageOps.equalize(im) im.thumbnail((self.smprint["width"], 960), PIL.Image.ANTIALIAS) prn.image(im, impl="bitImageColumn") shortcode = self.short_tweet_id(str(t["id"])) #prn.barcode("PRmad." + tw_shortcode, "CODE128", function_type="B") prn.soft_barcode("code128", "PRmad." + shortcode, module_width=0.16) with self.dbenv.begin(db=self.tweetdb, write=True) as txn: txn.put(shortcode.encode(), str(t.id).encode()) notifications = self.masto.notifications(since_id=self.last_mast_notif) if len(notifications) > 0: prn.set(bold=True) prn.textln("NOTIFICATIONS:") prn.set() # store the last notification id self.last_mast_notif = str(notifications[0].id) with self.dbenv.begin(db=self.tweetdb, write=True) as txn: txn.put(b"last_mast_notif", self.last_mast_notif.encode()) for note in notifications: username = note.account.display_name + " (" + note.account.acct + ")" prn.text(note["type"] + " " + str(note["created_at"]) + " from ") prn.textln(username + ":") prn.textln(str(note["status"])) prn.ln(2) prn.cut() prn.close() def main(): pr = PublicRelations() pr.run() if __name__ == "__main__": main()