You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
368 lines
14 KiB
Python
368 lines
14 KiB
Python
import codecs
|
|
import html
|
|
import io
|
|
import os.path
|
|
import random
|
|
import textwrap
|
|
import string
|
|
import urllib
|
|
|
|
import bleach
|
|
from escpos import printer
|
|
import facebook
|
|
from mastodon import Mastodon
|
|
import PIL
|
|
import requests
|
|
import twitter
|
|
|
|
from bureau import Bureau, add_command, add_api
|
|
|
|
class TWrapper():
|
|
pass
|
|
|
|
class PublicRelations(Bureau):
|
|
"""
|
|
The Public relations department manages the flow of information between
|
|
the screenless office and the general public. It provides interfaces
|
|
for Twitter, Facebook, Mastodon and other electronic PR platforms.
|
|
"""
|
|
|
|
name = "Public Relations"
|
|
prefix = "PR"
|
|
version = 0
|
|
|
|
def __init__(self):
|
|
Bureau.__init__(self)
|
|
|
|
CK = codecs.decode("neV4HPasZrXMjNaliWWVUIaHA",
|
|
"rot13")
|
|
CS = codecs.decode("ntegmiu3rdFAyMczCWM0bgpydFHwYXV3WedhtPCec1Pu9qdfGy",
|
|
"rot13")
|
|
|
|
|
|
TW_CREDS = os.path.expanduser('~/.screenless/tw_creds')
|
|
if not os.path.exists(TW_CREDS):
|
|
twitter.oauth_dance("The Screenless Office", CK, CS,
|
|
TW_CREDS)
|
|
|
|
oauth_token, oauth_secret = twitter.read_token_file(TW_CREDS)
|
|
|
|
self.t = TWrapper()
|
|
self.auth = twitter.OAuth(oauth_token, oauth_secret, CK, CS)
|
|
self.t.t = twitter.Twitter(auth=self.auth)
|
|
|
|
try:
|
|
MASTO_CREDS = os.path.expanduser('~/.screenless/masto_creds')
|
|
masto_server = self.config["mastodon"]["server"]
|
|
if not os.path.exists(MASTO_CREDS):
|
|
Mastodon.create_app('screenless',
|
|
api_base_url=masto_server,
|
|
to_file=MASTO_CREDS)
|
|
# TODO: catch the error when our token is too old and throw it out
|
|
self.masto = Mastodon(client_id=MASTO_CREDS,
|
|
api_base_url=masto_server)
|
|
masto_user = self.config["mastodon"]["user"]
|
|
masto_pass = self.config["mastodon"]["password"]
|
|
self.masto.log_in(masto_user, masto_pass)
|
|
except KeyError:
|
|
print("no mastodon config found.")
|
|
print("you can add a 'mastodon:' section to PR.yml with:")
|
|
print(" server: my.server.name")
|
|
print(" user: myuser")
|
|
print(" password: mypassword")
|
|
print("skipping masto login for now...")
|
|
|
|
# setup DBs to map short codes to tweet ids
|
|
self.tweetdb = self.dbenv.open_db(b"tweetdb")
|
|
with self.dbenv.begin(db=self.tweetdb) as txn:
|
|
self.last_mast_notif = txn.get(b"last_mast_notif")
|
|
|
|
def get_tweet_id(self, tweet_hash):
|
|
"""
|
|
take a short code and look up the tweet id
|
|
"""
|
|
with self.dbenv.begin(db=self.tweetdb) as txn:
|
|
tweetid = txn.get(tweet_hash.encode())
|
|
if tweetid:
|
|
return int(tweetid)
|
|
else:
|
|
return tweetid
|
|
|
|
def short_tweet_id(self, tweet_id):
|
|
"""
|
|
take a tweet id and return a short alphanumeric code
|
|
"""
|
|
shortcode = ''.join(random.choice(string.ascii_letters + string.digits)
|
|
for _ in range(5))
|
|
with self.dbenv.begin(db=self.tweetdb, write=True) as txn:
|
|
txn.put(shortcode.encode(), tweet_id.encode())
|
|
return shortcode
|
|
|
|
@add_command("tweetpic", "Post a Document Camera Image to Twitter")
|
|
def tweet_pic(self, reply_to_id=None, at_user=None):
|
|
"""
|
|
Takes a photograph using the document camera and posts it to Twitter.
|
|
"""
|
|
photo = self.send("PX", "photo")["photo"]
|
|
|
|
with open(photo, "rb") as imagefile:
|
|
imagedata = imagefile.read()
|
|
t_up = twitter.Twitter(domain='upload.twitter.com', auth=self.auth)
|
|
id_img1 = t_up.media.upload(media=imagedata)["media_id_string"]
|
|
|
|
if reply_to_id:
|
|
if at_user:
|
|
status = "@" + at_user
|
|
else:
|
|
status = " "
|
|
self.t.t.statuses.update(status=status,
|
|
in_reply_to_status_id=reply_to_id,
|
|
media_ids=id_img1)
|
|
else:
|
|
self.t.t.statuses.update(status="#screenless", media_ids=id_img1)
|
|
|
|
@add_command("fbpost", "Post to Facebook")
|
|
def post_fb(self):
|
|
"""
|
|
Takes a photograph using the document camera and posts it to Facebook.
|
|
"""
|
|
photo = self.send("PX", "photo")["photo"]
|
|
|
|
access_token = 'EAADixisn70ABADh2rEMZAYA8nGzd6ah8RFZA3URba263aCQ63ajLeTiZC5sgZCyIVSmRZBWReVsO9IuaLibX5RjW9Ja2tTZAbxgrDr1dPJzyGwcGTSV9bW1W4NigN0d9dFIH35W2fZBOkhvuLqOCDCBacIPjXPMxF7DRGyrz5lVHxTc04OlBeRX'
|
|
page_id = "screenless"
|
|
graph = facebook.GraphAPI(access_token)
|
|
print("uploading photo " + photo)
|
|
graph.put_photo(image=open(photo, 'rb'), album_path=page_id + "/photos",
|
|
message='#screenless')
|
|
|
|
@add_command("twtimeline", "Print Recent Tweets")
|
|
def tw_timeline(self, data=None):
|
|
"""
|
|
Print some recent tweets from your home timeline. Default is 10.
|
|
"""
|
|
if data:
|
|
try:
|
|
count = data["count"]
|
|
except KeyError as err:
|
|
print("You need to specify how many tweets you want!")
|
|
else:
|
|
count = 10
|
|
|
|
prn = self._get_small_printer()
|
|
# TODO: abstract this to use a simple templating system instead of raw
|
|
# TODO: add fancier formatting i.e. inverted text for username/handle
|
|
tweets = self.t.t.statuses.home_timeline(count=count,
|
|
tweet_mode="extended")
|
|
out = ""
|
|
for t in tweets:
|
|
prn.set(underline=1)
|
|
username = t["user"]["name"]
|
|
prn.textln(username)
|
|
prn.set()
|
|
twtext = html.unescape(t["full_text"])
|
|
prn.block_text(twtext, font="0")
|
|
prn.ln()
|
|
|
|
if "media" in t["entities"]:
|
|
if len(t["entities"]["media"]) > 0:
|
|
i_url = t["entities"]["media"][0]["media_url"]
|
|
filename = i_url.rsplit('/',1)[1]
|
|
filename = "/tmp/" + filename
|
|
print("fetching", i_url, filename)
|
|
urllib.request.urlretrieve(i_url, filename)
|
|
im = PIL.Image.open(filename)
|
|
if im.mode in ("L", "RGB", "P"):
|
|
im = PIL.ImageOps.equalize(im)
|
|
im.thumbnail((self.smprint["width"], 576), PIL.Image.ANTIALIAS)
|
|
prn.image(im, impl="bitImageColumn")
|
|
|
|
tw_shortcode = self.short_tweet_id(t["id_str"])
|
|
#prn.barcode("PRtwd." + tw_shortcode, "CODE128", function_type="B")
|
|
prn.soft_barcode("code128", "PRtwd." + tw_shortcode)
|
|
prn.ln(2)
|
|
|
|
prn.cut()
|
|
prn.close()
|
|
|
|
@add_command("twd", "Print Tweet Details")
|
|
def tw_details(self, data):
|
|
"""
|
|
Print detailed tweet info and commands for reply, like, retweet, etc.
|
|
"""
|
|
shortcode, _ = data.split(".")
|
|
tweet_id = self.get_tweet_id(shortcode)
|
|
tweet = self.t.t.statuses.show(id=tweet_id, tweet_mode="extended")
|
|
prn = self._get_small_printer()
|
|
|
|
prn.set(underline=1)
|
|
username = tweet["user"]["name"]
|
|
prn.textln(username)
|
|
prn.set()
|
|
twtext = html.unescape(tweet["full_text"])
|
|
prn.block_text(twtext, font="0")
|
|
|
|
if "media" in tweet["entities"]:
|
|
for entity in tweet["entities"]["media"]:
|
|
i_url = entity["media_url"]
|
|
filename = i_url.rsplit('/',1)[1]
|
|
filename = "/tmp/" + filename
|
|
print("fetching", i_url, filename)
|
|
urllib.request.urlretrieve(i_url, filename)
|
|
im = PIL.Image.open(filename)
|
|
if im.mode in ("L", "RGB", "P"):
|
|
im = PIL.ImageOps.equalize(im)
|
|
im.thumbnail((self.smprint["width"], 576), PIL.Image.ANTIALIAS)
|
|
prn.image(im, impl="bitImageColumn")
|
|
|
|
tw_shortcode = self.short_tweet_id(tweet["id_str"])
|
|
prn.text("retweet\r\n")
|
|
#prn.barcode("PRtwrt." + tw_shortcode, "CODE128", function_type="B")
|
|
prn.soft_barcode("code128", "PRtwrt." + tw_shortcode)
|
|
prn.text("like\r\n")
|
|
#prn.barcode("PRtwlk." + tw_shortcode, "CODE128", function_type="B")
|
|
prn.soft_barcode("code128", "PRtwlk." + tw_shortcode)
|
|
prn.text("\r\n\r\n")
|
|
#prn.barcode("PRtwre." + tw_shortcode, "CODE128", function_type="B")
|
|
prn.soft_barcode("code128", "PRtwre." + tw_shortcode)
|
|
prn.ln(2)
|
|
prn.cut()
|
|
prn.close()
|
|
|
|
@add_command("twrt", "Re-Tweet")
|
|
def tw_retweet(self, data):
|
|
"""
|
|
Re-Tweet a tweet from someone else.
|
|
"""
|
|
shortcode, _ = data.split(".")
|
|
tweet_id = self.get_tweet_id(shortcode)
|
|
self.t.t.statuses.retweet(id=tweet_id)
|
|
|
|
@add_command("twre", "Reply to Tweet")
|
|
def tw_reply(self, data):
|
|
"""
|
|
Reply to a tweet.
|
|
"""
|
|
shortcode, _ = data.split(".")
|
|
tweet_id = self.get_tweet_id(shortcode)
|
|
tweet = self.t.t.statuses.show(id=tweet_id)
|
|
at_user = tweet["user"]["screen_name"]
|
|
self.tweet_pic(tweet_id=tweet_id, at_user=at_user)
|
|
|
|
@add_command("twlk", "Like a Tweet")
|
|
def tw_like(self, data):
|
|
"""
|
|
'Like' a tweet.
|
|
"""
|
|
shortcode, _ = data.split(".")
|
|
tweet_id = self.get_tweet_id(shortcode)
|
|
self.t.t.favorites.create(_id=tweet_id)
|
|
|
|
@add_command("mare", "Boost a toot")
|
|
def boost_toot(self, data):
|
|
"""
|
|
Boost a toot (or whatever kind of Fediverse content)
|
|
"""
|
|
shortcode, _ = data.split(".")
|
|
toot_id = self.get_toot_id(shortcode)
|
|
self.masto.status_reblog(toot_id)
|
|
|
|
@add_command("mafv", "Favorite a toot")
|
|
def fav_toot(self, data):
|
|
"""
|
|
favorite a toot (or other kind of Fediverse content)
|
|
"""
|
|
shortcode, _ = data.split(".")
|
|
toot_id = self.get_toot_id(shortcode)
|
|
self.masto.status_favorite(toot_id)
|
|
|
|
@add_command("tootpic", "Post a Document Camera Image to the Fediverse")
|
|
def toot_pic(self):
|
|
"""
|
|
Takes a photograph using the document camera and posts it to the Fediverse (Mastodon, Pleroma, etc.)
|
|
"""
|
|
photo = self.send("PX", "photo")["photo"]
|
|
media = self.masto.media_post(photo)
|
|
post = self.masto.status_post("", media_ids=[media])
|
|
|
|
@add_command("tootline", "Print Recent Toots")
|
|
def tootline(self, data=None):
|
|
"""
|
|
Print some recent entries from your Fediverse timeline. Default is 10.
|
|
"""
|
|
if data:
|
|
try:
|
|
count = data["count"]
|
|
except KeyError as err:
|
|
print("You need to specify how many toots you want!")
|
|
else:
|
|
count = 10
|
|
|
|
prn = self._get_small_printer()
|
|
prn.codepage = "cp437"
|
|
|
|
# TODO: add fancier formatting i.e. inverted text for username/handle
|
|
# TODO: clean this up to use the built in formatting from escpos lib
|
|
# and make a print_status function to use for notifications too
|
|
toots = self.masto.timeline(limit=count)
|
|
out = ""
|
|
for t in toots:
|
|
prn.set(underline=1)
|
|
username = t.account.display_name
|
|
prn.textln(username)
|
|
prn.set()
|
|
ttext = bleach.clean(t.content, tags=[], strip=True)
|
|
ttext = html.unescape(ttext)
|
|
prn.block_text(ttext, font="0")
|
|
|
|
if len(t.media_attachments) > 0:
|
|
img = None
|
|
t.media_attachments.reverse()
|
|
for media in t.media_attachments:
|
|
if media.type == "image":
|
|
img = media
|
|
if img:
|
|
#filename = img.url.rsplit('/',1)[1]
|
|
#filename = "/tmp/" + filename
|
|
#print("fetching", img.remote_url, filename)
|
|
req_data = requests.get(img.url)
|
|
im = PIL.Image.open(io.BytesIO(req_data.content))
|
|
if im.mode in ("L", "RGB", "P"):
|
|
im = PIL.ImageOps.equalize(im)
|
|
im.thumbnail((self.smprint["width"], 576), PIL.Image.ANTIALIAS)
|
|
prn.image(im, impl="bitImageColumn")
|
|
|
|
tw_shortcode = self.short_tweet_id(str(t["id"]))
|
|
#prn.barcode("PRmad." + tw_shortcode, "CODE128", function_type="B")
|
|
prn.soft_barcode("code128", "PRmad." + tw_shortcode)
|
|
|
|
notifications = self.masto.notifications(since_id=self.last_mast_notif)
|
|
if len(notifications) > 0:
|
|
prn.set(bold=True)
|
|
prn.textln("NOTIFICATIONS:")
|
|
prn.set()
|
|
|
|
# store the last notification id
|
|
self.last_mast_notif = str(notifications[0].id)
|
|
with self.dbenv.begin(db=self.tweetdb, write=True) as txn:
|
|
txn.put(b"last_mast_notif", self.last_mast_notif.encode())
|
|
for note in notifications:
|
|
username = note.account.display_name + " (" + note.account.acct + ")"
|
|
prn.text(note["type"] + " " + str(note["created_at"]) + " from ")
|
|
prn.textln(username + ":")
|
|
prn.textln(str(note["status"]))
|
|
with self.dbenv.begin(db=self.tweetdb, write=True) as txn:
|
|
txn.put(shortcode.encode(), tweet_id.encode())
|
|
|
|
prn.ln(2)
|
|
prn.cut()
|
|
prn.close()
|
|
|
|
|
|
def main():
|
|
pr = PublicRelations()
|
|
pr.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|