from __future__ import print_function
import os , sys , re , urllib , urlparse , html5lib , json
from PIL import Image
from math import log
from argparse import ArgumentParser
from urllib2 import urlopen
from xml . etree import ElementTree as ET
# from wiki_get_html import page_html
from mwclient import Site
from mwclient . page import Page
from mwclient . errors import APIError
from leaflet import tiles_wrapper , recursiverender , gridrender , html
from imagetile2 import tile_image
from urllib import quote as urlquote
def wget ( url , path , blocksize = 4 * 1000 ) :
if type ( url ) == unicode :
url = url . encode ( " utf-8 " )
count = 0
with open ( path , " wb " ) as fout :
fin = urlopen ( url )
while True :
data = fin . read ( blocksize )
if not data :
break
fout . write ( data )
count + = len ( data )
return count
def page_url ( site , page ) :
# print ("[page_url]", page.name, file=sys.stderr)
base = os . path . split ( site . site [ ' base ' ] ) [ 0 ]
path = page . normalize_title ( page . name )
if type ( path ) == unicode :
path = path . encode ( " utf-8 " )
path = urlquote ( path )
uret = os . path . join ( base , path )
# assert type(uret) == str
return uret
def wiki_url_to_title ( url ) :
return urllib . unquote ( url . split ( " / " ) [ - 1 ] )
def parse_gallery ( t ) :
""" returns [(imagepageurl, caption, articleurl), ...] """
galleryitems = t . findall ( " .//li[@class= ' gallerybox ' ] " )
items = [ ]
for i in galleryitems :
image_link = i . find ( " .//a[@class= ' image ' ] " )
src = None
captiontext = None
article = None
if image_link != None :
src = image_link . attrib . get ( " href " )
# src = src.split("/")[-1]
caption = i . find ( " .//*[@class= ' gallerytext ' ] " )
if caption :
captiontext = ET . tostring ( caption , method = " html " )
articlelink = caption . find ( " .//a " )
if articlelink != None :
article = articlelink . attrib . get ( " href " )
# f = wiki.Pages[imgname]
# items.append((f.imageinfo['url'], captiontext))
items . append ( ( src , captiontext , article ) )
return items
def mwfilepage_to_url ( wiki , url ) :
filename = urllib . unquote ( url . split ( " / " ) [ - 1 ] )
page = wiki . Pages [ filename ]
return page , page . imageinfo [ ' url ' ]
def url_to_path ( url ) :
""" https://pzwiki.wdka.nl/mediadesign/File:I-could-have-written-that_these-are-the-words_mb_300dpi.png """
path = urllib . unquote ( urlparse . urlparse ( url ) . path )
return " / " . join ( path . split ( " / " ) [ 3 : ] )
def wiki_absurl ( wiki , url ) :
ret = ' '
if type ( wiki . host ) == tuple :
ret = wiki . host [ 0 ] + " :// " + wiki . host [ 1 ]
else :
ret = " http:// " + wiki . host
return urlparse . urljoin ( ret , url )
def wiki_title_to_url ( wiki , title ) :
""" relies on wiki.site[ ' base ' ] being set to the public facing URL of the Main page """
ret = ' '
parts = urlparse . urlparse ( wiki . site [ ' base ' ] )
base , main_page = os . path . split ( parts . path )
ret = parts . scheme + " :// " + parts . netloc + base
p = wiki . pages [ title ]
ret + = " / " + p . normalize_title ( p . name )
return ret
def ensure_wiki_image_tiles ( wiki , imagepageurl , text = ' ' , basepath = " tiles " , force = False , bgcolor = None , tilewidth = 256 , tileheight = 256 , zoom = 3 , margin_right = 0 , margin_bottom = 0 ) :
print ( " ensure_wiki_image_tiles " , imagepageurl , file = sys . stderr )
page , imageurl = mwfilepage_to_url ( wiki , imagepageurl )
path = os . path . join ( basepath , url_to_path ( imageurl ) )
print ( " imageurl, path " , imageurl , path , file = sys . stderr )
ret = tiles_wrapper ( path , imagepageurl , text = text )
tp = ret . get_tile_path ( 0 , 0 , 0 )
if os . path . exists ( tp ) and not force :
return ret
try :
os . makedirs ( path )
except OSError :
pass
im = Image . open ( urlopen ( imageurl ) )
tile_image ( im , zoom , tilewidth , tileheight , path + " / " , ret . tilename , bgcolor , margin_right , margin_bottom )
return ret
def textcell ( paras ) :
node = { }
node [ ' text ' ] = paras [ : 1 ]
moretext = paras [ 1 : ]
if moretext :
node [ ' children ' ] = [ textcell ( [ x ] ) for x in moretext ]
return node
def name_to_path ( name ) :
return name . replace ( " / " , " _ " )
def render_article ( wiki , ref , basepath = " tiles " , depth = 0 , maxdepth = 3 , tilewidth = 256 , tileheight = 256 ) :
print ( " render_article " , ref , file = sys . stderr )
if type ( ref ) == Page :
page = ref
title = page . name
ref = wiki_title_to_url ( wiki , page . name )
elif ref . startswith ( " http " ) :
title = wiki_url_to_title ( ref )
page = wiki . pages [ title ]
else :
title = ref
page = wiki . pages [ title ]
ref = wiki_title_to_url ( wiki , page . name )
# pagetext = page.text()
# print ("WIKI PARSE", title, file=sys.stderr)
parse = wiki . parse ( page = title )
html = parse [ ' text ' ] [ ' * ' ]
# print ("GOT HTML ", html, file=sys.stderr)
tree = html5lib . parse ( html , treebuilder = " etree " , namespaceHTMLElements = False )
body = tree . find ( " ./body " )
paras = [ ]
images = [ ]
imgsrcs = { }
for c in body :
if c . tag == " p " :
# filter out paras like <p><br></p> but checking text-only render length
ptext = ET . tostring ( c , encoding = " utf-8 " , method = " text " ) . strip ( )
if len ( ptext ) > 0 :
ptext = ET . tostring ( c , encoding = " utf-8 " , method = " html " ) . strip ( )
paras . append ( ptext )
elif c . tag == " ul " and c . attrib . get ( " class " ) != None and " gallery " in c . attrib . get ( " class " ) :
# print ("GALLERY")
gallery = parse_gallery ( c )
# Ensure image is downloaded ... at least the 00 image...
for src , caption , article in gallery :
src = wiki_absurl ( wiki , src )
if src in imgsrcs :
continue
imgsrcs [ src ] = True
print ( " GalleryImage " , src , caption , article , file = sys . stderr )
# if article and depth < maxdepth:
# article = wiki_absurl(wiki, article)
# images.append(render_article(wiki, article, caption, basepath, depth+1, maxdepth))
# else:
images . append ( ensure_wiki_image_tiles ( wiki , src , caption , basepath , tilewidth = tilewidth , tileheight = tileheight ) . zoom ( ) )
for a in body . findall ( ' .//a[@class= " image " ] ' ) :
caption = a . attrib . get ( " title " , ' ' )
src = wiki_absurl ( wiki , a . attrib . get ( " href " ) )
# OEI... skippin svg for the moment (can't go straight to PIL)
if src . endswith ( " .svg " ) :
continue
print ( u " Image_link {0} : ' {1} ' " . format ( src , caption ) . encode ( " utf-8 " ) , file = sys . stderr )
if src in imgsrcs :
continue
imgsrcs [ src ] = True
images . append ( ensure_wiki_image_tiles ( wiki , src , caption , basepath , tilewidth = tilewidth , tileheight = tileheight ) . zoom ( ) )
print ( " {0} paras, {1} images " . format ( len ( paras ) , len ( images ) ) , file = sys . stderr )
if title == None :
title = page . name
basename = " tiles/ " + name_to_path ( page . name )
# gallerynode = gridrender(images, basename)
# return gallerynode
cells = [ ]
if len ( paras ) > 0 :
cells . append ( textcell ( paras ) )
cells . extend ( images )
ret = recursiverender ( cells , basename , tilewidth = tilewidth , tileheight = tileheight )
ret [ ' text ' ] = u """ <p class= " caption " ><a class= " url " href= " {1} " > {0} </a></p> """ . format ( title , ref )
if images :
ret [ ' image ' ] = images [ 0 ] [ ' image ' ]
return ret
# article = {}
# article['text'] = title
# article['children'] = children = []
# children.append(textcell(paras))
# for iz in images[:2]:
# if 'image' not in article and 'image' in iz:
# article['image'] = iz['image']
# children.append(iz)
# restimages = images[2:]
# if len(restimages) == 1:
# children.append(restimages[0])
# elif len(restimages) > 1:
# children.append(gridrender(restimages, basename))
# return article
def render_category ( wiki , cat , output = " tiles " , tilewidth = 256 , tileheight = 256 ) :
print ( " Render Category " , cat , file = sys . stderr )
# if type(cat) == Page:
# page = ref
# title = page.name
# ref = wiki_title_to_url(wiki, page.name)
if cat . startswith ( " http " ) :
title = wiki_url_to_title ( cat )
cat = wiki . pages [ title ]
else :
title = ref
cat = wiki . pages [ cat ]
# ref = wiki_title_to_url(wiki, cat.name)
print ( " cat " , cat , file = sys . stderr )
pages = [ ]
for m in cat . members ( ) :
pages . append ( m )
pages . sort ( key = lambda x : x . name )
pagenodes = [ render_article ( wiki , x . name , tilewidth = tilewidth , tileheight = tileheight ) for x in pages ]
for page , node in zip ( pages , pagenodes ) :
node [ ' text ' ] = u """ <p class= " caption " ><a class= " url " href= " {1} " > {0} </a></p> """ . format ( page . name , wiki_title_to_url ( wiki , page . name ) )
ret = gridrender ( pagenodes , output + " / " + cat . name . replace ( " : " , " _ " ) , tilewidth = tilewidth , tileheight = tileheight )
ret [ ' text ' ] = u """ <p class= " caption " ><a class= " url " href= " {0} " > {1} </a></p> """ . format ( wiki_title_to_url ( wiki , cat . name ) , cat . name )
return ret
# for p in pages:
# print (p.name, wiki_title_to_url(wiki, p.name))
def make_category ( args ) :
wiki = Site ( ( args . wikiprotocol , args . wikihost ) , path = args . wikipath )
root_node = render_category ( wiki , args . category )
if args . html :
print ( html ( root_node , " " ) )
else :
print ( json . dumps ( root_node , indent = 2 ) )
def make_article ( args ) :
wiki = Site ( ( args . wikiprotocol , args . wikihost ) , path = args . wikipath )
root_node = render_article ( wiki , args . wikipage , tilewidth = args . tilewidth , tileheight = args . tileheight )
if args . html :
print ( html ( root_node , " " ) )
else :
print ( json . dumps ( root_node , indent = 2 ) )
def make_gallery ( args ) :
wiki = Site ( ( args . wikiprotocol , args . wikihost ) , path = args . wikipath )
# apiurl = args.wikiprotocol+"://"+args.wikihost+args.wikipath+"api.php"
if len ( args . wikipage ) == 1 :
root_node = render_article ( wiki , args . wikipage [ 0 ] , tilewidth = args . tilewidth , tileheight = args . tileheight )
else :
children = [ ]
for wikipage in args . wikipage :
print ( " rendering " , wikipage , file = sys . stderr )
if " Category: " in wikipage :
print ( " rendering " , wikipage , file = sys . stderr )
cnode = render_category ( wiki , wikipage , args . output )
else :
cnode = render_article ( wiki , wikipage , tilewidth = args . tilewidth , tileheight = args . tileheight )
children . append ( cnode )
if args . recursive :
root_node = recursiverender ( children , args . output + " / " + args . name , direction = 1 , tilewidth = args . tilewidth , tileheight = args . tileheight )
else :
root_node = gridrender ( children , args . output + " / " + args . name , direction = 1 , tilewidth = args . tilewidth , tileheight = args . tileheight )
if args . html :
print ( html ( root_node , " " ) )
else :
print ( json . dumps ( root_node , indent = 2 ) )
from time import sleep
def testwiki ( args ) :
site = Site ( ( args . wikiprotocol , args . wikihost ) , path = args . wikipath )
return site
USER_NS = 2
def imageinfo_with_thumbnail ( site , name ) :
d = site . api (
" query " ,
titles = name ,
prop = " imageinfo " ,
iiprop = " url|mime " ,
iiurlwidth = 1024
)
pp = d [ ' query ' ] [ ' pages ' ]
for key in pp :
return pp [ key ] [ ' imageinfo ' ] [ 0 ]
def recentfiles ( args ) :
# open connection to wiki
wiki = Site ( ( args . wikiprotocol , args . wikihost ) , path = args . wikipath )
# Prepare user list to filter (if args.usercategory)
filter_by_users = None
if args . usercategory :
filter_by_users = set ( )
usercategory = wiki . categories . get ( args . usercategory )
for p in usercategory . members ( ) :
if p . namespace == USER_NS :
filter_by_users . add ( p . page_title )
# Load args.json for oldest timestamp
last_date = None
if args . json :
try :
with open ( args . json ) as f :
print ( " Reading {0} " . format ( args . json ) , file = sys . stderr )
for line in f :
data = json . loads ( line )
if ' date ' in data :
last_date = data [ ' date ' ]
except IOError as e :
pass
# Prepare the query arguments
qargs = {
' list ' : " allimages " ,
' ailimit ' : 50 ,
' aisort ' : ' timestamp ' ,
' aidir ' : ' descending ' ,
' aiprop ' : " timestamp|url|user|userid "
}
if args . oldest :
qargs [ ' aiend ' ] = args . oldest
if last_date :
print ( " Using aiend {0} " . format ( last_date ) , file = sys . stderr )
qargs [ ' aiend ' ] = last_date
count = 0 # used to satisfy --limit when given
skipped_users = set ( ) # nicety for outputting names only once when skipped
items_to_output = [ ]
# LOOP for continuing queries as needed
while True :
qq = wiki . api ( ' query ' , * * qargs )
# print ("Got {0} results".format(len(qq['query']['allimages'])), file=sys.stderr)
results = qq [ ' query ' ] [ ' allimages ' ]
for r in results :
# Filter on user
if filter_by_users != None :
if r [ ' user ' ] not in filter_by_users :
if r [ ' user ' ] not in skipped_users :
print ( " Skipping user {0} " . format ( r [ ' user ' ] ) , file = sys . stderr )
skipped_users . add ( r [ ' user ' ] )
continue
try :
# Filter on mime type (image/*)
filepage = wiki . pages . get ( r [ ' title ' ] )
# mwclient's imageinfo doesn't have mime (or thumbnail info)
# imageinfo = filepage.imageinfo
imageinfo = imageinfo_with_thumbnail ( wiki , r [ ' title ' ] )
if not imageinfo [ ' mime ' ] . startswith ( " image/ " ) :
print ( u " Skipping non image ( {0} ) {1} " . format ( imageinfo [ ' mime ' ] , r [ ' title ' ] ) . encode ( " utf-8 " ) , file = sys . stderr )
continue
if ' thumburl ' not in imageinfo :
print ( u " Skipping item with no thumburl {0} " . format ( r [ ' title ' ] ) . encode ( " utf-8 " ) , file = sys . stderr )
continue
# Deal with edge case at items == aiend are returned
if last_date and r [ ' timestamp ' ] == last_date :
# print ("SKIPPING AIEND item", file=sys.stderr)
break
# Construct an item for output
print ( u " [ {0} ], date: {1} " . format ( filepage . page_title , r [ ' timestamp ' ] ) . encode ( " utf-8 " ) , file = sys . stderr )
usagepage = None
for usagepage in filepage . imageusage ( ) :
break # just grab the first usage page
# url : local path to file
# imageurl = imageinfo['url']
imageurl = imageinfo [ ' thumburl ' ]
localpath = imageurl . replace ( " https://pzwiki.wdka.nl/mw-mediadesign/images/ " , " wiki/ " )
# wget image from wiki to local folder
if not os . path . exists ( localpath ) :
try :
os . makedirs ( os . path . split ( localpath ) [ 0 ] )
except OSError :
pass
print ( " downloading {0} to {1} " . format ( imageurl , localpath ) , file = sys . stderr )
wget ( imageurl , localpath )
item = { }
item [ ' url ' ] = localpath
item [ ' date ' ] = r [ ' timestamp ' ]
item [ ' src ' ] = page_url ( wiki , filepage )
userpage = wiki . pages . get ( ' User: ' + r [ ' user ' ] )
if usagepage :
item [ ' text ' ] = u ' <a href= " {0} " > {1} </a><br>Uploaded by <a href= " {2} " > {3} </a> ' . format (
page_url ( wiki , usagepage ) ,
usagepage . page_title ,
page_url ( wiki , userpage ) ,
r [ ' user ' ] ) . encode ( " utf-8 " )
else :
item [ ' text ' ] = u ' <a href= " {0} " > {1} </a><br>Uploaded by <a href= " {2} " > {3} </a> ' . format (
page_url ( wiki , filepage ) ,
filepage . page_title ,
page_url ( wiki , userpage ) ,
r [ ' user ' ] ) . encode ( " utf-8 " )
# print (json.dumps(item))
items_to_output . append ( item )
# honor --limit
count + = 1
if args . limit and count == args . limit :
break
except APIError as e :
print ( " Error {0} , skipping " . format ( e ) , file = sys . stderr )
if args . limit and count == args . limit :
break
# continue the query if possible (pre-loop)...
if ' continue ' in qq :
qargs [ ' aicontinue ' ] = qq [ ' continue ' ] [ ' aicontinue ' ]
else :
# we've reached the end of the query data
break
# OUTPUT RESULTS
# reverse to be chronological
items_to_output . reverse ( )
if args . json :
with open ( args . json , " a " ) as f :
for x in items_to_output :
print ( json . dumps ( x ) , file = f )
else :
for x in items_to_output :
print ( json . dumps ( x ) )
if __name__ == " __main__ " :
ap = ArgumentParser ( " " )
ap . add_argument ( " --wikiprotocol " , default = " https " )
ap . add_argument ( " --wikihost " , default = " pzwiki.wdka.nl " )
ap . add_argument ( " --wikipath " , default = " /mw-mediadesign/ " )
ap . add_argument ( " --wikishortpath " , default = " /mediadesign/ " )
ap . add_argument ( " --tilewidth " , type = int , default = 256 )
ap . add_argument ( " --tileheight " , type = int , default = 256 )
# ap.add_argument("--zoom", type=int, default=3)
ap . add_argument ( " --output " , default = " tiles " )
# ap.add_argument("--title", default="TITLE")
subparsers = ap . add_subparsers ( help = ' sub-command help ' )
ap_article = subparsers . add_parser ( ' article ' , help = ' Render an article ' )
ap_article . add_argument ( " wikipage " )
ap_article . add_argument ( " --html " , default = False , action = " store_true " )
ap_article . set_defaults ( func = make_article )
ap_gallery = subparsers . add_parser ( ' gallery ' , help = ' Render a gallery of articles ' )
ap_gallery . add_argument ( " wikipage " , nargs = " + " )
ap_gallery . add_argument ( " --html " , default = False , action = " store_true " )
ap_gallery . add_argument ( " --recursive " , default = False , action = " store_true " )
ap_gallery . add_argument ( " --direction " , type = int , default = 3 , help = " cell to recursively expand into, 0-3, default: 3 (bottom-right) " )
ap_gallery . add_argument ( " --name " , default = None )
ap_gallery . set_defaults ( func = make_gallery )
ap_gallery = subparsers . add_parser ( ' testwiki ' , help = ' Render a gallery of articles ' )
ap_gallery . set_defaults ( func = testwiki )
ap_article = subparsers . add_parser ( ' category ' , help = ' Render an article ' )
ap_article . add_argument ( " category " )
ap_article . add_argument ( " --html " , default = False , action = " store_true " )
ap_article . set_defaults ( func = make_category )
ap_recentfiles = subparsers . add_parser ( ' recentfiles ' , help = ' Incrementally update a json file with information about recent files ' )
ap_recentfiles . add_argument ( " --usercategory " , help = " limit to activity by users that are members of this category " )
ap_recentfiles . add_argument ( " --limit " , type = int , help = " limit " )
ap_recentfiles . add_argument ( " --oldest " , default = None , help = " No results earlier than this timestamp (e.g. 2018-01-01T00:00:00Z) " )
ap_recentfiles . add_argument ( " --json " , default = None , help = " Use this json file as both input (to check last timestampt) and output -- append results chronologically as json-stream. " )
ap_recentfiles . set_defaults ( func = recentfiles )
args = ap . parse_args ( )
ret = args . func ( args )