# publications office for reading and writing from datetime import datetime import io import json import os import random import string import urllib.request, urllib.parse, urllib.error import bleach import code128 import feedparser import lxml.html import PIL from readability import readability import requests from bureau import Bureau, add_command, add_api class Publications(Bureau): """ The Publications Office serves as a kind of screenless content management system. Create, update and organize your sites while doing most of the work on paper or anything you can photograph. """ name = "Publications Office" prefix = "PB" version = 0 def __init__(self): Bureau.__init__(self) # set up db for published sites # TODO: rename this to something less ambiguous self.db = os.path.expanduser("~/.screenless/PB.data") if not os.path.exists(self.db): os.mkdir(self.db) # set up urldb for short-codes self.urldb = self.dbenv.open_db(b"urldb") def _make_shorturl(self, url): def _shortcode(): return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) # we only have about a billion so make sure we don't collide keys with self.dbenv.begin(write=True, db=self.urldb) as txn: res = "not None" while res is not None: tmpcode = _shortcode() res = txn.get(tmpcode.encode()) txn.put(tmpcode.encode(), url.encode()) return tmpcode def _get_url(self, shortcode): """look up a URL from a shortcode returns full unicode url """ with self.dbenv.begin(db=self.urldb) as txn: return txn.get(shortcode.encode()).decode() @add_command("new", "Create a new Publication/Site") def new_site(self): """ Create a new Publication/Site, set up config and tace a picture from the document camera as the index page. Finally, it will print out the main page with commands for working with the site. """ site_dir = os.path.join(self.db, "1") site_id = 1 while os.path.exists(site_dir): site_id += 1 site_dir = os.path.join(self.db, str(site_id)) os.mkdir(site_dir) root_d = {"template": "default", "id": site_id} with open(os.path.join(site_dir, "root.json", "w")) as root_json: root_json.write(json.dumps(root_d)) photo = self.send("PX", "photo")["photo"] # TODO: come up with a generic set of img form operations for Bureau # should map regions defined with percentages to names form_img = PIL.Image.open(photo) fx, fy = form_img.size title_region = (0, 0, 0.5 * fx, 0.125 * fy) title_img = form_img.crop(title_region) content_region = (0, 0.125 * fy, fx, fy) content_img = form_img.crop(content_region) def _update_page(self, site, page): pass @add_command("news", "Print a personalized daily newspaper") def daily_news(self): news = self._get_news() # TODO: get weather # TODO: get finance inbox = self.send("PO", "unread") date = datetime.today().strftime("%A %B %e, %Y") if inbox is None: inbox = [] # if IMAP times out just move on... self.print_full("news.html", news=news, inbox=inbox, date=date) @add_command("r", "Print a web page for reading") def print_url(self, data): """ Print out a web page for reading. The command requires a short-code, typically referenced via barcode. Short-codes refer to full resource URIs recorded in the Publications office 'urldb' database. """ shortcode, _ = data.split(".") with self.dbenv.begin(db=self.urldb) as txn: self.log.debug("looking up short-code:" + shortcode) url = txn.get(shortcode.encode('utf-8')) if not url: self.log.warning("no valid URL in db for short code: " + shortcode) return else: url = url.decode() # download page with requests headers = {'User-Agent': 'Mozilla/5.0'} try: resp = requests.get(url, timeout=20.0, headers=headers) except requests.ReadTimeout: self.log.warning("Timeout reading RSS feed %s", url) return # TODO: do we need to spit out an error? # re-render with readability doc = readability.Document(resp.text, url=url) timestamp = datetime.now().strftime("Sourced %d %B, %Y at %H:%M") html = lxml.html.document_fromstring(doc.summary()) notecount = 0 # store links then make corresponding svg barcodes for link in html.findall(".//a"): notecount += 1 tmpcode = self._make_shorturl(link.attrib["href"]) svg = code128.svg("PBr." + tmpcode) footnote = html.makeelement("div") footnote.attrib["class"] = "footnote" notetext = html.makeelement("div") notetext.text = str(notecount) + ". " + link.attrib["href"] footnote.append(notetext) footnote.append(lxml.html.fromstring(svg.encode())) html.append(footnote) self.print_full("article.html", title=doc.title(), article=lxml.html.tostring(html).decode("utf-8"), url=url, date=timestamp) def _get_news(self): """fetch a set of latest news entries from sources specified in config """ entries = [] # TODO: come up with a good way to make this configurable feeds = [("http://feeds.bbci.co.uk/news/world/rss.xml", 10), ("http://www.rbb-online.de/content/rbb/rbb/politik/uebersicht-kurznachrichten.xml/allitems=true/feed=rss/path=middleColumnList!teaserbox.xml", 8), ("http://feeds.arstechnica.com/arstechnica/index/", 10), ("http://feeds.feedburner.com/zerohedge/feed", 5), ("http://planet.python.org/rss20.xml", 5)] for source in feeds: url = source[0] num_entries = source[1] # get feed data with requests using a timeout try: resp = requests.get(url, timeout=20.0) except requests.ReadTimeout: self.log.warning("Timeout reading RSS feed %s", url) continue feed_data = io.BytesIO(resp.content) feed = feedparser.parse(feed_data) # work around if we don't have enough news if num_entries > len(feed.entries): num_entries = len(feed.entries) for _ in range(num_entries): entry = feed.entries.pop() entry.source = feed.feed.title entry.dbhash = self._make_shorturl(entry.link) entry.svg = code128.svg("PBr." + entry.dbhash) if hasattr(entry, "media_thumbnail"): entry.img = entry.media_thumbnail[-1]["url"] else: entry.img = " " # limit summary to the last space below 500 characters if len(entry.summary) > 500: end = entry.summary.rfind(" ", 0, 499) entry.summary = entry.summary[0:end] + "…" entry.summary = bleach.clean(entry.summary, strip=True) entries.append(entry) return entries def main(): pub = Publications() pub.run() if __name__ == "__main__": main()