You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
11 KiB
Python

import codecs
import html
import io
import os.path
import random
import textwrap
import string
import urllib
from escpos import printer
import facebook
from mastodon import Mastodon
import PIL
import requests
import twitter
from bureau import Bureau, add_command, add_api
class TWrapper():
pass
class PublicRelations(Bureau):
"""
The Public relations department manages the flow of information between
the screenless office and the general public. It provides interfaces
for Twitter, Facebook, Mastodon and other electronic PR platforms.
"""
name = "Public Relations"
prefix = "PR"
version = 0
def __init__(self):
Bureau.__init__(self)
CK = codecs.decode("neV4HPasZrXMjNaliWWVUIaHA",
"rot13")
CS = codecs.decode("ntegmiu3rdFAyMczCWM0bgpydFHwYXV3WedhtPCec1Pu9qdfGy",
"rot13")
TW_CREDS = os.path.expanduser('~/.screenless/tw_creds')
if not os.path.exists(TW_CREDS):
twitter.oauth_dance("The Screenless Office", CK, CS,
TW_CREDS)
oauth_token, oauth_secret = twitter.read_token_file(TW_CREDS)
self.t = TWrapper()
self.auth = twitter.OAuth(oauth_token, oauth_secret, CK, CS)
self.t.t = twitter.Twitter(auth=self.auth)
MASTO_CREDS = os.path.expanduser('~/.screenless/masto_creds')
if not os.path.exists(MASTO_CREDS):
Mastodon.create_app('screenless',
api_base_url='https://mastodon.social',
to_file=MASTO_CREDS)
self.masto = Mastodon(client_id=MASTO_CREDS,
api_base_url='https://mastodon.social')
masto_user = self.config["mastodon"]["user"]
masto_pass = self.config["mastodon"]["password"]
self.masto.log_in(masto_user, masto_pass)
# setup DBs to map short codes to tweet ids
self.tweetdb = self.dbenv.open_db(b"tweetdb")
def get_tweet_id(self, tweet_hash):
"""
take a short code and look up the tweet id
"""
with self.dbenv.begin(db=self.tweetdb) as txn:
tweetid = txn.get(tweet_hash.encode())
if tweetid:
return int(tweetid)
else:
return tweetid
def short_tweet_id(self, tweet_id):
"""
take a tweet id and return a short alphanumeric code
"""
shortcode = ''.join(random.choice(string.ascii_letters + string.digits)
for _ in range(5))
with self.dbenv.begin(db=self.tweetdb, write=True) as txn:
txn.put(shortcode.encode(), tweet_id.encode())
return shortcode
@add_command("tweetpic", "Post a Document Camera Image to Twitter")
def tweet_pic(self):
"""
Takes a photograph using the document camera and posts it to Twitter.
"""
photo = self.send("PX", "photo")["photo"]
with open(photo, "rb") as imagefile:
imagedata = imagefile.read()
t_up = twitter.Twitter(domain='upload.twitter.com', auth=self.auth)
id_img1 = t_up.media.upload(media=imagedata)["media_id_string"]
self.t.t.statuses.update(status="#screenless", media_ids=id_img1)
@add_command("fbpost", "Post to Facebook")
def post_fb(self):
"""
Takes a photograph using the document camera and posts it to Facebook.
"""
photo = self.send("PX", "photo")["photo"]
access_token = 'EAADixisn70ABADh2rEMZAYA8nGzd6ah8RFZA3URba263aCQ63ajLeTiZC5sgZCyIVSmRZBWReVsO9IuaLibX5RjW9Ja2tTZAbxgrDr1dPJzyGwcGTSV9bW1W4NigN0d9dFIH35W2fZBOkhvuLqOCDCBacIPjXPMxF7DRGyrz5lVHxTc04OlBeRX'
page_id = "screenless"
graph = facebook.GraphAPI(access_token)
print("uploading photo " + photo)
graph.put_photo(image=open(photo, 'rb'), album_path=page_id + "/photos",
message='#screenless')
@add_command("twtimeline", "Print Recent Tweets")
def tw_timeline(self, data=None):
"""
Print some recent tweets from your home timeline. Default is 10.
"""
if data:
try:
count = data["count"]
except KeyError as err:
print("You need to specify how many tweets you want!")
else:
count = 10
prn = printer.Usb(0x416, 0x5011, in_ep=0x81, out_ep=0x03)
prn.codepage = "cp437"
# TODO: add fancier formatting i.e. inverted text for username/handle
tweets = self.t.t.statuses.home_timeline(count=count)
out = ""
for t in tweets:
prn.set(text_type="U")
username = t["user"]["name"].encode("cp437", "ignore")
prn._raw(username)
prn.text("\r\n")
prn.set(text_type="NORMAL")
twtext = html.unescape(t["text"])
t_wrapped = textwrap.fill(twtext, width=48) + "\r\n"
t_enc = t_wrapped.encode("cp437", "ignore")
prn._raw(t_enc)
if "media" in t["entities"]:
if len(t["entities"]["media"]) > 0:
i_url = t["entities"]["media"][0]["media_url"]
filename = i_url.rsplit('/',1)[1]
filename = "/tmp/" + filename
print("fetching", i_url, filename)
urllib.request.urlretrieve(i_url, filename)
im = PIL.Image.open(filename)
if im.mode in ("L", "RGB", "P"):
im = PIL.ImageOps.equalize(im)
im.thumbnail((576, 576), PIL.Image.ANTIALIAS)
prn.image(im, impl="bitImageColumn")
tw_shortcode = self.short_tweet_id(t["id_str"])
prn.barcode("PRtwd." + tw_shortcode, "CODE128", function_type="B")
prn.text("\r\n\r\n")
prn.cut()
@add_command("twd", "Print Tweet Details")
def tw_details(self, data):
"""
Print detailed tweet info and commands for reply, like, retweet, etc.
"""
shortcode, _ = data.split(".")
tweet_id = self.get_tweet_id(shortcode)
tweet = self.t.t.statuses.show(id=tweet_id)
prn = printer.Usb(0x416, 0x5011, in_ep=0x81, out_ep=0x03)
prn.codepage = "cp437"
prn.set(text_type="U")
username = tweet["user"]["name"].encode("cp437", "ignore")
prn._raw(username)
prn.text("\r\n")
prn.set(text_type="NORMAL")
twtext = html.unescape(tweet["text"])
t_wrapped = textwrap.fill(twtext, width=48) + "\r\n"
t_enc = t_wrapped.encode("cp437", "ignore")
prn._raw(t_enc)
if "media" in tweet["entities"]:
if len(tweet["entities"]["media"]) > 0:
i_url = tweet["entities"]["media"][0]["media_url"]
filename = i_url.rsplit('/',1)[1]
filename = "/tmp/" + filename
print("fetching", i_url, filename)
urllib.request.urlretrieve(i_url, filename)
im = PIL.Image.open(filename)
if im.mode in ("L", "RGB", "P"):
im = PIL.ImageOps.equalize(im)
im.thumbnail((576, 576), PIL.Image.ANTIALIAS)
prn.image(im, impl="bitImageColumn")
tw_shortcode = self.short_tweet_id(tweet["id_str"])
prn.text("retweet\r\n")
prn.barcode("PRtwrt." + tw_shortcode, "CODE128", function_type="B")
prn.text("like\r\n")
prn.barcode("PRtwlk." + tw_shortcode, "CODE128", function_type="B")
prn.text("\r\n\r\n")
prn.cut()
@add_command("twrt", "Re-Tweet")
def tw_retweet(self, data):
"""
Re-Tweet a tweet from someone else.
"""
shortcode, _ = data.split(".")
tweet_id = self.get_tweet_id(shortcode)
self.t.t.statuses.retweet(id=tweet_id)
@add_command("twre", "Reply to Tweet")
def tw_reply(self, data):
"""
Reply to a tweet.
"""
shortcode, _ = data.split(".")
tweet_id = self.get_tweet_id(shortcode)
# TODO: do the actual reply
# maybe this should be an optional arg for tweet_pic?
return
@add_command("twlk", "Like a Tweet")
def tw_like(self, data):
"""
'Like' a tweet.
"""
shortcode, _ = data.split(".")
tweet_id = self.get_tweet_id(shortcode)
self.t.t.favorites.create(_id=tweet_id)
@add_command("mare", "Boost a toot")
def boost_toot(self, data):
"""
Boost a toot (or whatever kind of Fediverse content)
"""
shortcode, _ = data.split(".")
toot_id = self.get_toot_id(shortcode)
self.masto.status_reblog(toot_id)
@add_command("mafv", "Favorite a toot")
def fav_toot(self, data):
"""
favorite a toot (or other kind of Fediverse content)
"""
shortcode, _ = data.split(".")
toot_id = self.get_toot_id(shortcode)
self.masto.status_favorite(toot_id)
@add_command("tootpic", "Post a Document Camera Image to the Fediverse")
def toot_pic(self):
"""
Takes a photograph using the document camera and posts it to the Fediverse (Mastodon, Pleroma, etc.)
"""
photo = self.send("PX", "photo")["photo"]
media = self.masto.media_post(photo)
post = self.masto.status_post("", media_ids=[media])
#self.log.debug(str(post))
@add_command("tootline", "Print Recent Toots")
def tootline(self, data=None):
"""
Print some recent entries from your Fediverse timeline. Default is 10.
"""
if data:
try:
count = data["count"]
except KeyError as err:
print("You need to specify how many toots you want!")
else:
count = 10
prn = printer.Usb(0x416, 0x5011, in_ep=0x81, out_ep=0x03)
prn.codepage = "cp437"
# TODO: add fancier formatting i.e. inverted text for username/handle
toots = self.masto.timeline(limit=count)
out = ""
for t in toots:
prn.set(text_type="U")
username = t.account.display_name.encode("cp437", "ignore")
prn._raw(username)
prn.text("\r\n")
prn.set(text_type="NORMAL")
ttext = html.unescape(t.content)
t_wrapped = textwrap.fill(ttext, width=48) + "\r\n"
t_enc = t_wrapped.encode("cp437", "ignore")
prn._raw(t_enc)
if len(t.media_attachments) > 0:
img = None
t.media_attachments.reverse()
for media in t.media_attachments:
if media.type == "image":
img = media
if img:
filename = img.url.rsplit('/',1)[1]
filename = "/tmp/" + filename
print("fetching", img.remote_url, filename)
req_data = requests.get(img.url)
im = PIL.Image.open(io.BytesIO(filename))
if im.mode in ("L", "RGB", "P"):
im = PIL.ImageOps.equalize(im)
im.thumbnail((576, 576), PIL.Image.ANTIALIAS)
prn.image(im, impl="bitImageColumn")
tw_shortcode = self.short_tweet_id(str(t["id"]))
prn.barcode("PRmad." + tw_shortcode, "CODE128", function_type="B")
prn.text("\r\n\r\n")
prn.cut()
def main():
pr = PublicRelations()
pr.run()
if __name__ == "__main__":
main()