You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

15 KiB

Archiving the sandbox

Archiving files / projects from XPUB class of 2020 and 2021.

Starting point, are files on the local raspberry pi server:

In the "spider", the patterns you add are regex's that both filter the URLs that you want to copy (so matching a pattern means it becomes part of the archive) and it also specifies how to rewrite the URL to a local path

URLs that don't match the patterns stay "as-is" allowing you to still have links "outside" to external things.

URLs that match that pattern get relativized to a local path, and the spider makes sure the linked files also get downloaded (and again spidered)

In [1]:
import requests
from urllib.parse import urljoin, urlparse, quote as urlquote, unquote as urlunquote
import html5lib
from xml.etree import ElementTree as ET
import os
import re
import sys
from hashlib import md5

import urllib3
# https://stackoverflow.com/questions/27981545/suppress-insecurerequestwarning-unverified-https-request-is-being-made-in-pytho
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Three ways to back reference in re:

\g<name>
\g<1>
\1

It's really useful when saving resources to use the appropriate/conventional file extension for that kind of file. URLs don't always include a filename with an extension, the HTTP protocol specifies file types using the "Content-type" header and a value, known as a MIME type, a protocol originally developed for specifying file types in email attachments. The EXT dictionary provides a mapping of MIME types we expect to receive to suitable file extensions.

In [23]:
EXT = {
    "text/html": "html",
    "text/css": "css",
    "image/jpeg": "jpg",
    "image/png": "png",
    "image/gif": "gif",
    "application/javascript": "js",
    "text/javascript": "js",
    "video/webm": "webm"
}

def memoize(f):
    memo = {}
    def helper(x):
        if x not in memo:            
            memo[x] = f(x)
        return memo[x]
    return helper

@memoize
def ext_for (url):
    # try / allow simple extension test to override
    parts = urlparse(url)
    if parts.path:
        ext = os.path.splitext(parts.path)[1].lstrip(".").lower()
        if ext in ('html', 'js', 'css', 'gif', 'jpg', 'png', 'jpeg', 'mp3', 'ogg', 'ogv', 'webm', 'mp4', 'svg', 'webp'):
            return ext
    try:
        r = requests.head(url, verify=False)
        ct = r.headers['content-type'].split(";")[0]
        if ct not in EXT:
            print (f"Warning, unknown extension for content-type {ct}, using bin", file=sys.stderr)
        return EXT.get(ct, "bin")
    except Exception as e:
        print (f"Exception {url}: {e}", file=sys.stderr)
        return "bin"

def split_fragment(href):
    try:
        ri = href.rindex("#")
        return href[:ri], href[ri:]
    except ValueError:
        return href, ''

def split_query(href):
    try:
        ri = href.rindex("?")
        return href[:ri], href[ri:]
    except ValueError:
        return href, ''

With all the functions in place, the actual spider loop is relatively straightforward. There's a todo list, and a set called done to remember what URLs have already been downloaded.

2021 05 24: Adding optional encoding to make force encoding on HTML pages (requests seems to get it wrong sometimes)

In [56]:
class Spider:
    def __init__(self, output_path=".", \
                 skip_existing_files=False, \
                 pattern_flags=0, \
                 verbose=False, \
                 preserve_query=True, \
                 additional_attributes=None,
                 encoding=None):
        self.verbose = verbose
        self.pattern_flags = pattern_flags
        self.patterns = []
        self.output_path = output_path
        self.skip_existing_files = skip_existing_files
        self.preserve_query = preserve_query
        self.additional_attributes = []
        self.encoding = encoding
        if additional_attributes:
            if type(additional_attributes) == str:
                self.additional_attributes.append(additional_attributes)
            else:
                self.additional_attributes.extend(additional_attributes)
        self.rewrites = []

    def add_pattern (self, search, replace, post_process=None):
        """ nb:the replace should always form a local path, with no query
            as re.sub is used, the search should probably capture the entire string ^$
            otherwise unmatched trailing stuff (for instance) can suddenly appear at the end
            (would this be a nicer way to allow queries to be preserved?? ... but then would need to change the code to reparse query in the local path)
        """
        if type(search) == str:
            search = re.compile(search, flags=self.pattern_flags)
        self.patterns.append((search, replace, post_process))

    def sub_pattern (self, url):
        for psearch, preplace, post_process in self.patterns:
            m = psearch.search(url)
            if m:
                ret = psearch.sub(preplace, url)
                if post_process:
                    ret = post_process(ret)
                return ret

    def url_to_local_path (self, url):
        ret = self.sub_pattern(url)
        if ret:
            ret = urlunquote(ret)
        return ret

    def generic_url_to_path (self, url):
        md5hash = md5(url.encode()).hexdigest()
        parts = urlparse(url)
        ext = ext_for(url)
        return f"ext/{md5hash}.{ext}"

    def url_to_path (self, url):
        l = self.url_to_local_path(url)
        if l:
            return l
        else:
            return self.generic_url_to_path(url)

    def localize (self, url):
        if url not in self.done and url not in self.todo:
            self.todo.append(url)
        ret = self.url_to_path(url)
        # print (f"localize {url} => {ret}")
        return ret

    def should_localize(self, url):
        return self.url_to_local_path(url) is not None

    def relpath (self, to_file, from_file):
        return os.path.relpath(to_file, os.path.dirname(from_file))
            
    def download(self, url):
        path = self.url_to_path(url)
        usepath = os.path.join(self.output_path, path)
        if self.skip_existing_files and os.path.exists(usepath):
            if self.verbose:
                print ("File already exists, skipping...")
            return # why do I need to add this back ?! (2021-03-06)
        #if self.verbose:
        additional_attributes = []
        if self.additional_attributes:
            additional_attributes.extend(self.additional_attributes)
        all_attributes = ["href"] + additional_attributes
        self.rewrites.append((url, usepath))
        print (f"{url} => {usepath}")
        if os.path.dirname(usepath):
            os.makedirs(os.path.dirname(usepath), exist_ok=True)
        try:
            r = requests.get(url, verify=False)
            if r.headers["content-type"].startswith("text/html"):
                if self.encoding:
                    r.encoding = self.encoding
                t = html5lib.parse(r.text, namespaceHTMLElements=False)

                for elt in t.findall(".//*[@src]"):
                    src = urljoin(url, elt.attrib.get("src"))
                    # print (elt.tag, src, url_to_path(src))
                    local_link = self.localize(src)
                    elt.attrib["src"] = urlquote(self.relpath(local_link, path))
                for attribname in all_attributes:
                    for elt in t.findall(f".//*[@{attribname}]"):
                        href = urljoin(url, elt.attrib.get(attribname))
                        href, fragment = split_fragment(href)
                        if self.preserve_query:
                            href_noquery, query = split_query(href)
                        else:
                            query = ''
                        # print (elt.tag, href, url_to_path(href))
                        if (elt.tag == "link" and elt.attrib.get("rel") == "stylesheet") or \
                            (elt.tag == "a" and self.should_localize(href)) or \
                            (attribname in additional_attributes and self.should_localize(href)):
                            # localize: force/ensure download href, return local path
                            local_link = self.localize(href)
                            # need path == current document path
                            elt.attrib[attribname] = urlquote(self.relpath(local_link, path)) + query + fragment

                with open(usepath, "w") as fout:
                    print(ET.tostring(t, method="html", encoding="unicode"), file=fout)
            elif r.headers["content-type"] == "text/css":
                if self.encoding:
                    r.encoding = self.encoding
                src = r.text
                def css_sub(m):
                    href = urljoin(url, m.group(2))
                    if self.should_localize(href):
                        local_link = self.localize(href)
                        return "url("+m.group(1)+urlquote(self.relpath(local_link, path))+m.group(3)+")"
                    return m.group(0)                            
                newsrc = re.sub(r"""url\((['" ]*)(.+?)(['" ]*)\)""", css_sub, src)
                with open(usepath, "w") as fout:
                    print(newsrc, file=fout)
            else:
                # print ("Downloading binary...")
                with open(usepath, 'wb') as fd:
                    for chunk in r.iter_content(chunk_size=1024):
                        fd.write(chunk)
        except Exception as e:
            print (f"Exception {url}: {e}", file=sys.stderr)
 
    def spider (self, url):
        self.done = set()
        self.todo = [url]
        count = 0
        while self.todo:
            url = self.todo[0]
            self.todo = self.todo[1:]
            self.done.add(url)
            self.download(url)
            count +=1 
In [11]:
# cleanup
!rm -rf sandbox_archive/
In [27]:
# remove all html
def rm_html (path):
    rmlist = []
    for root, dirs, files in os.walk("sandbox_archive"):
        for f in files:
            if os.path.splitext(f)[1] == ".html":
                rmlist.append(os.path.join(root, f))
    for f in rmlist:
        print (f)
        os.remove(f)
In [ ]:
rm_html("sandbox_archive")
In [57]:
spider = Spider("sandbox_archive", skip_existing_files=True, additional_attributes="data-url", encoding="utf-8")
spider.add_pattern(r"^https?://hub\.xpub\.nl/sandbox/$", "index.html")
spider.add_pattern(r"^https?://hub\.xpub\.nl/sandbox/(.+)$", "\g<1>")
In [ ]:
spider.spider("https://hub.xpub.nl/sandbox/")
In [ ]: