|
|
|
import codecs
|
|
|
|
import html
|
|
|
|
import os.path
|
|
|
|
import random
|
|
|
|
import textwrap
|
|
|
|
import string
|
|
|
|
import urllib
|
|
|
|
|
|
|
|
from escpos import printer
|
|
|
|
import facebook
|
|
|
|
import PIL
|
|
|
|
import requests
|
|
|
|
import twitter
|
|
|
|
|
|
|
|
from bureau import Bureau, add_command, add_api
|
|
|
|
|
|
|
|
class TWrapper():
|
|
|
|
pass
|
|
|
|
|
|
|
|
class PublicRelations(Bureau):
|
|
|
|
"""
|
|
|
|
The Public relations department manages the flow of information between
|
|
|
|
the screenless office and the general public. It provides interfaces
|
|
|
|
for Twitter, Facebook, Mastodon and other electronic PR platforms.
|
|
|
|
"""
|
|
|
|
|
|
|
|
name = "Public Relations"
|
|
|
|
prefix = "PR"
|
|
|
|
version = 0
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
Bureau.__init__(self)
|
|
|
|
|
|
|
|
CK = codecs.decode("neV4HPasZrXMjNaliWWVUIaHA",
|
|
|
|
"rot13")
|
|
|
|
CS = codecs.decode("ntegmiu3rdFAyMczCWM0bgpydFHwYXV3WedhtPCec1Pu9qdfGy",
|
|
|
|
"rot13")
|
|
|
|
|
|
|
|
|
|
|
|
TW_CREDS = os.path.expanduser('~/.screenless/tw_creds')
|
|
|
|
if not os.path.exists(TW_CREDS):
|
|
|
|
twitter.oauth_dance("The Screenless Office", CK, CS,
|
|
|
|
TW_CREDS)
|
|
|
|
|
|
|
|
oauth_token, oauth_secret = twitter.read_token_file(TW_CREDS)
|
|
|
|
|
|
|
|
self.t = TWrapper()
|
|
|
|
self.auth = twitter.OAuth(oauth_token, oauth_secret, CK, CS)
|
|
|
|
self.t.t = twitter.Twitter(auth=self.auth)
|
|
|
|
|
|
|
|
# setup DBs to map short codes to tweet ids
|
|
|
|
self.tweetdb = self.dbenv.open_db(b"tweetdb")
|
|
|
|
|
|
|
|
def get_tweet_id(self, tweet_hash):
|
|
|
|
"""
|
|
|
|
take a short code and look up the tweet id
|
|
|
|
"""
|
|
|
|
with self.dbenv.begin(db=self.tweetdb) as txn:
|
|
|
|
tweetid = txn.get(tweet_hash.encode())
|
|
|
|
if tweetid:
|
|
|
|
return int(tweetid)
|
|
|
|
else:
|
|
|
|
return tweetid
|
|
|
|
|
|
|
|
def short_tweet_id(self, tweet_id):
|
|
|
|
"""
|
|
|
|
take a tweet id and return a short alphanumeric code
|
|
|
|
"""
|
|
|
|
shortcode = ''.join(random.choice(string.ascii_letters + string.digits)
|
|
|
|
for _ in range(5))
|
|
|
|
with self.dbenv.begin(db=self.tweetdb, write=True) as txn:
|
|
|
|
txn.put(shortcode.encode(), tweet_id.encode())
|
|
|
|
return shortcode
|
|
|
|
|
|
|
|
@add_command("tweetpic", "Post a Document Camera Image to Twitter")
|
|
|
|
def tweet_pic(self):
|
|
|
|
"""
|
|
|
|
Takes a photograph using the document camera and posts it to Twitter.
|
|
|
|
"""
|
|
|
|
photo = self.send("PX", "photo")["photo"]
|
|
|
|
|
|
|
|
with open(photo, "rb") as imagefile:
|
|
|
|
imagedata = imagefile.read()
|
|
|
|
t_up = twitter.Twitter(domain='upload.twitter.com', auth=self.auth)
|
|
|
|
|
|
|
|
id_img1 = t_up.media.upload(media=imagedata)["media_id_string"]
|
|
|
|
self.t.t.statuses.update(status="#screenless", media_ids=id_img1)
|
|
|
|
|
|
|
|
@add_command("fbpost", "Post to Facebook")
|
|
|
|
def post_fb(self):
|
|
|
|
"""
|
|
|
|
Takes a photograph using the document camera and posts it to Facebook.
|
|
|
|
"""
|
|
|
|
photo = self.send("PX", "photo")["photo"]
|
|
|
|
|
|
|
|
access_token = 'EAADixisn70ABADh2rEMZAYA8nGzd6ah8RFZA3URba263aCQ63ajLeTiZC5sgZCyIVSmRZBWReVsO9IuaLibX5RjW9Ja2tTZAbxgrDr1dPJzyGwcGTSV9bW1W4NigN0d9dFIH35W2fZBOkhvuLqOCDCBacIPjXPMxF7DRGyrz5lVHxTc04OlBeRX'
|
|
|
|
page_id = "screenless"
|
|
|
|
graph = facebook.GraphAPI(access_token)
|
|
|
|
print("uploading photo " + photo)
|
|
|
|
graph.put_photo(image=open(photo, 'rb'), album_path=page_id + "/photos",
|
|
|
|
message='#screenless')
|
|
|
|
|
|
|
|
@add_command("twtimeline", "Print Recent Tweets")
|
|
|
|
def tw_timeline(self, data=None):
|
|
|
|
"""
|
|
|
|
Print some recent tweets from your home timeline. Default is 10.
|
|
|
|
"""
|
|
|
|
if data:
|
|
|
|
try:
|
|
|
|
count = data["count"]
|
|
|
|
except KeyError as err:
|
|
|
|
print("You need to specify how many tweets you want!")
|
|
|
|
else:
|
|
|
|
count = 10
|
|
|
|
|
|
|
|
prn = printer.Usb(0x416, 0x5011, in_ep=0x81, out_ep=0x03)
|
|
|
|
prn.codepage = "cp437"
|
|
|
|
|
|
|
|
# TODO: add fancier formatting i.e. inverted text for username/handle
|
|
|
|
tweets = self.t.t.statuses.home_timeline(count=count)
|
|
|
|
out = ""
|
|
|
|
for t in tweets:
|
|
|
|
prn.set(text_type="U")
|
|
|
|
username = t["user"]["name"].encode("cp437", "ignore")
|
|
|
|
prn._raw(username)
|
|
|
|
prn.text("\r\n")
|
|
|
|
prn.set(text_type="NORMAL")
|
|
|
|
twtext = html.unescape(t["text"])
|
|
|
|
t_wrapped = textwrap.fill(twtext, width=48) + "\r\n"
|
|
|
|
t_enc = t_wrapped.encode("cp437", "ignore")
|
|
|
|
prn._raw(t_enc)
|
|
|
|
|
|
|
|
if "media" in t["entities"]:
|
|
|
|
if len(t["entities"]["media"]) > 0:
|
|
|
|
i_url = t["entities"]["media"][0]["media_url"]
|
|
|
|
filename = i_url.rsplit('/',1)[1]
|
|
|
|
filename = "/tmp/" + filename
|
|
|
|
print("fetching", i_url, filename)
|
|
|
|
urllib.request.urlretrieve(i_url, filename)
|
|
|
|
im = PIL.Image.open(filename)
|
|
|
|
if im.mode in ("L", "RGB", "P"):
|
|
|
|
im = PIL.ImageOps.equalize(im)
|
|
|
|
im.thumbnail((576, 576), PIL.Image.ANTIALIAS)
|
|
|
|
prn.image(im, impl="bitImageColumn")
|
|
|
|
|
|
|
|
tw_shortcode = self.short_tweet_id(t["id_str"])
|
|
|
|
prn.barcode("PRtwd." + tw_shortcode, "CODE128", function_type="B")
|
|
|
|
prn.text("\r\n\r\n")
|
|
|
|
|
|
|
|
prn.cut()
|
|
|
|
|
|
|
|
@add_command("twd", "Print Tweet Details")
|
|
|
|
def tw_details(self, data):
|
|
|
|
"""
|
|
|
|
Print detailed tweet info and commands for reply, like, retweet, etc.
|
|
|
|
"""
|
|
|
|
shortcode, _ = data.split(".")
|
|
|
|
tweet_id = self.get_tweet_id(shortcode)
|
|
|
|
self.log.debug("tweet details for id:" + tweet_id)
|
|
|
|
tweet = self.t.t.statuses.show(id=tweet_id)
|
|
|
|
prn = printer.Usb(0x416, 0x5011, in_ep=0x81, out_ep=0x03)
|
|
|
|
prn.codepage = "cp437"
|
|
|
|
|
|
|
|
prn.set(text_type="U")
|
|
|
|
username = tweet["user"]["name"].encode("cp437", "ignore")
|
|
|
|
prn._raw(username)
|
|
|
|
prn.text("\r\n")
|
|
|
|
prn.set(text_type="NORMAL")
|
|
|
|
twtext = html.unescape(tweet["text"])
|
|
|
|
t_wrapped = textwrap.fill(twtext, width=48) + "\r\n"
|
|
|
|
t_enc = t_wrapped.encode("cp437", "ignore")
|
|
|
|
prn._raw(t_enc)
|
|
|
|
|
|
|
|
if "media" in tweet["entities"]:
|
|
|
|
if len(tweet["entities"]["media"]) > 0:
|
|
|
|
i_url = tweet["entities"]["media"][0]["media_url"]
|
|
|
|
filename = i_url.rsplit('/',1)[1]
|
|
|
|
filename = "/tmp/" + filename
|
|
|
|
print("fetching", i_url, filename)
|
|
|
|
urllib.request.urlretrieve(i_url, filename)
|
|
|
|
im = PIL.Image.open(filename)
|
|
|
|
if im.mode in ("L", "RGB", "P"):
|
|
|
|
im = PIL.ImageOps.equalize(im)
|
|
|
|
im.thumbnail((576, 576), PIL.Image.ANTIALIAS)
|
|
|
|
prn.image(im, impl="bitImageColumn")
|
|
|
|
|
|
|
|
tw_shortcode = self.short_tweet_id(tweet["id_str"])
|
|
|
|
prn.barcode("PRtwrt." + tw_shortcode, "CODE128", function_type="B")
|
|
|
|
prn.barcode("PRtwlk." + tw_shortcode, "CODE128", function_type="B")
|
|
|
|
prn.text("\r\n\r\n")
|
|
|
|
|
|
|
|
prn.cut()
|
|
|
|
|
|
|
|
@add_command("twrt", "Re-Tweet")
|
|
|
|
def tw_retweet(self, data):
|
|
|
|
"""
|
|
|
|
Re-Tweet a tweet from someone else.
|
|
|
|
"""
|
|
|
|
tweet_id = self.get_tweet_id(data)
|
|
|
|
self.t.t.statuses.retweet(id=tweet_id)
|
|
|
|
return
|
|
|
|
|
|
|
|
@add_command("twre", "Reply to Tweet")
|
|
|
|
def tw_reply(self, data):
|
|
|
|
"""
|
|
|
|
Reply to a tweet.
|
|
|
|
"""
|
|
|
|
# TODO: look up tweet id
|
|
|
|
#self.t.t.statuses.update(status="@AUTHOR_ID", media_ids=id_img1)
|
|
|
|
return
|
|
|
|
|
|
|
|
@add_command("twlk", "Like a Tweet")
|
|
|
|
def tw_like(self, data):
|
|
|
|
"""
|
|
|
|
'Like' a tweet.
|
|
|
|
"""
|
|
|
|
tweet_id = self.get_tweet_id(data)
|
|
|
|
self.t.t.favorites.create(id=tweet_id)
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
pr = PublicRelations()
|
|
|
|
pr.run()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|