# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
#
# This file is part of the LibreOffice project.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Conversion watch, initially intended to detect if document layout changed since the last time it was run.
#
# Print a set of docs, compare the pdf against the old run and highlight the differences
#
import getopt
import os
import subprocess
import sys
import time
import uuid
import datetime
import traceback
import threading
try:
from urllib.parse
import quote
except ImportError:
from urllib
import quote
try:
import pyuno
import uno
import unohelper
except ImportError:
print(
"pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
print(
"PYTHONPATH=/installation/opt/program")
print(
"URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
raise
try:
from com.sun.star.document
import XDocumentEventListener
except ImportError:
print(
"UNO API class not found: try to set URE_BOOTSTRAP variable")
print(
"URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
raise
### utilities ###
def log(*args):
print(*args, flush=
True)
def partition(list, pred):
left = []
right = []
for e
in list:
if pred(e):
left.append(e)
else:
right.append(e)
return (left, right)
def filelist(dir, suffix):
if len(dir) == 0:
raise Exception(
"filelist: empty directory")
if not(dir[-1] ==
"/"):
dir +=
"/"
files = [dir + f
for f
in os.listdir(dir)]
# log(files)
return [f
for f
in files
if os.path.isfile(f)
and os.path.splitext(f)[1] == suffix]
def getFiles(dirs, suffix):
files = []
for dir
in dirs:
files += filelist(dir, suffix)
return files
### UNO utilities ###
class OfficeConnection:
def __init__(self, args):
self.args = args
self.soffice =
None
self.socket =
None
self.xContext =
None
def setUp(self):
(method, sep, rest) = self.args[
"--soffice"].partition(
":")
if sep !=
":":
raise Exception(
"soffice parameter does not specify method")
if method ==
"path":
self.socket =
"pipe,name=pytest" + str(uuid.uuid1())
try:
userdir = self.args[
"--userdir"]
except KeyError:
raise Exception(
"'path' method requires --userdir")
if not(userdir.startswith(
"file://")):
raise Exception(
"--userdir must be file URL")
self.soffice = self.bootstrap(rest, userdir, self.socket)
elif method ==
"connect":
self.socket = rest
else:
raise Exception(
"unsupported connection method: " + method)
self.xContext = self.connect(self.socket)
def bootstrap(self, soffice, userdir, socket):
argv = [ soffice,
"--accept=" + socket +
";urp",
"-env:UserInstallation=" + userdir,
"--quickstart=no",
"--norestore",
"--nologo",
"--headless" ]
if "--valgrind" in self.args:
argv.append(
"--valgrind")
return subprocess.Popen(argv)
def connect(self, socket):
xLocalContext = uno.getComponentContext()
xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
"com.sun.star.bridge.UnoUrlResolver", xLocalContext)
url =
"uno:" + socket +
";urp;StarOffice.ComponentContext"
log(
"OfficeConnection: connecting to: " + url)
while True:
try:
xContext = xUnoResolver.resolve(url)
return xContext
# except com.sun.star.connection.NoConnectException
except pyuno.getClass(
"com.sun.star.connection.NoConnectException"):
log(
"NoConnectException: sleeping...")
time.sleep(1)
def tearDown(self):
if self.soffice:
if self.xContext:
try:
log(
"tearDown: calling terminate()...")
xMgr = self.xContext.ServiceManager
xDesktop = xMgr.createInstanceWithContext(
"com.sun.star.frame.Desktop", self.xContext)
xDesktop.terminate()
log(
"...done")
# except com.sun.star.lang.DisposedException:
except pyuno.getClass(
"com.sun.star.beans.UnknownPropertyException"):
log(
"caught UnknownPropertyException")
pass # ignore, also means disposed
except pyuno.getClass(
"com.sun.star.lang.DisposedException"):
log(
"caught DisposedException")
pass # ignore
else:
self.soffice.terminate()
ret = self.soffice.wait()
self.xContext =
None
self.socket =
None
self.soffice =
None
if ret != 0:
raise Exception(
"Exit status indicates failure: " + str(ret))
# return ret
class WatchDog(threading.Thread):
def __init__(self, connection):
threading.Thread.__init__(self, name=
"WatchDog " + connection.socket)
self.connection = connection
def run(self):
try:
if self.connection.soffice:
# not possible for "connect"
self.connection.soffice.wait(timeout=120)
# 2 minutes?
except subprocess.TimeoutExpired:
log(
"WatchDog: TIMEOUT -> killing soffice")
self.connection.soffice.terminate()
# actually killing oosplash...
self.connection.xContext =
None
log(
"WatchDog: killed soffice")
class PerTestConnection:
def __init__(self, args):
self.args = args
self.connection =
None
self.watchdog =
None
def getContext(self):
return self.connection.xContext
def setUp(self):
assert(
not(self.connection))
def preTest(self):
conn = OfficeConnection(self.args)
conn.setUp()
self.connection = conn
self.watchdog = WatchDog(self.connection)
self.watchdog.start()
def postTest(self):
if self.connection:
try:
self.connection.tearDown()
finally:
self.connection =
None
self.watchdog.join()
def tearDown(self):
assert(
not(self.connection))
class PersistentConnection:
def __init__(self, args):
self.args = args
self.connection =
None
def getContext(self):
return self.connection.xContext
def setUp(self):
conn = OfficeConnection(self.args)
conn.setUp()
self.connection = conn
def preTest(self):
assert(self.connection)
def postTest(self):
assert(self.connection)
def tearDown(self):
if self.connection:
try:
self.connection.tearDown()
finally:
self.connection =
None
def simpleInvoke(connection, test):
try:
connection.preTest()
test.run(connection.getContext())
finally:
connection.postTest()
def retryInvoke(connection, test):
tries = 5
while tries > 0:
try:
tries -= 1
try:
connection.preTest()
test.run(connection.getContext())
return
finally:
connection.postTest()
except KeyboardInterrupt:
raise # Ctrl+C should work
except Exception:
log(
"retryInvoke: caught exception")
raise Exception(
"FAILED retryInvoke")
def runConnectionTests(connection, invoker, tests):
try:
connection.setUp()
failed = []
for test
in tests:
try:
invoker(connection, test)
except KeyboardInterrupt:
raise # Ctrl+C should work
except Exception:
failed.append(test.file)
estr = traceback.format_exc()
log(
"... FAILED with exception:\n" + estr)
return failed
finally:
connection.tearDown()
class EventListener(XDocumentEventListener,unohelper.Base):
def __init__(self):
self.layoutFinished =
False
def documentEventOccured(self, event):
# log(str(event.EventName))
if event.EventName ==
"OnLayoutFinished":
self.layoutFinished =
True
def disposing(event):
pass
def mkPropertyValue(name, value):
return uno.createUnoStruct(
"com.sun.star.beans.PropertyValue",
name, 0, value, 0)
### tests ###
def loadFromURL(xContext, url):
xDesktop = xContext.ServiceManager.createInstanceWithContext(
"com.sun.star.frame.Desktop", xContext)
props = [(
"Hidden",
True), (
"ReadOnly",
True)]
# FilterName?
loadProps = tuple([mkPropertyValue(name, value)
for (name, value)
in props])
xListener = EventListener()
xGEB = xContext.getValueByName(
"/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
xGEB.addDocumentEventListener(xListener)
xDoc =
None
try:
xDoc = xDesktop.loadComponentFromURL(url,
"_blank", 0, loadProps)
log(
"...loadComponentFromURL done")
if xDoc
is None:
raise Exception(
"No document loaded?")
time_ = 0
while time_ < 30:
if xListener.layoutFinished:
return xDoc
log(
"delaying...")
time_ += 1
time.sleep(1)
log(
"timeout: no OnLayoutFinished received")
return xDoc
except Exception:
if xDoc:
log(
"CLOSING")
xDoc.close(
True)
raise
finally:
if xListener:
xGEB.removeDocumentEventListener(xListener)
def printDoc(xContext, xDoc, url):
props = [ mkPropertyValue(
"FileName", url) ]
# xDoc.print(props)
uno.invoke(xDoc,
"print", (tuple(props),))
# damn, that's a keyword!
busy =
True
while busy:
log(
"printing...")
time.sleep(1)
prt = xDoc.getPrinter()
for value
in prt:
if value.Name ==
"IsBusy":
busy = value.Value
log(
"...done printing")
class LoadPrintFileTest:
def __init__(self, file, prtsuffix):
self.file = file
self.prtsuffix = prtsuffix
def run(self, xContext):
start = datetime.datetime.now()
log(
"Time: " + str(start) +
" Loading document: " + self.file)
xDoc =
None
try:
if os.name ==
'nt' and self.file[1] ==
':':
url =
"file:///" + self.file[0:2] + quote(self.file[2:])
else:
url =
"file://" + quote(self.file)
xDoc = loadFromURL(xContext, url)
log(
"loadFromURL in: " + str(datetime.datetime.now() - start))
printDoc(xContext, xDoc, url + self.prtsuffix)
finally:
if xDoc:
xDoc.close(
True)
end = datetime.datetime.now()
log(
"...done with: " + self.file +
" in: " + str(end - start))
def runLoadPrintFileTests(opts, dirs, suffix, reference):
if reference:
prtsuffix =
".pdf.reference"
else:
prtsuffix =
".pdf"
files = getFiles(dirs, suffix)
tests = (LoadPrintFileTest(file, prtsuffix)
for file
in files)
# connection = PersistentConnection(opts)
connection = PerTestConnection(opts)
failed = runConnectionTests(connection, simpleInvoke, tests)
print(
"all printed: FAILURES: " + str(len(failed)))
for fail
in failed:
print(fail)
return failed
def mkImages(file, resolution):
argv = [
"gs",
"-r" + resolution,
"-sOutputFile=" + file +
".%04d.jpeg",
"-dNOPROMPT",
"-dNOPAUSE",
"-dBATCH",
"-sDEVICE=jpeg", file ]
subprocess.check_call(argv)
def mkAllImages(dirs, suffix, resolution, reference, failed):
if reference:
prtsuffix =
".pdf.reference"
else:
prtsuffix =
".pdf"
for dir
in dirs:
files = filelist(dir, suffix)
log(files)
for f
in files:
if f
in failed:
log(
"Skipping failed: " + f)
else:
mkImages(f + prtsuffix, resolution)
def identify(imagefile):
argv = [
"identify",
"-format",
"%k", imagefile]
process = subprocess.Popen(argv, stdout=subprocess.PIPE)
result, _ = process.communicate()
if process.wait() != 0:
raise Exception(
"identify failed")
if result.partition(b
"\n")[0] != b
"1":
log(
"identify result: " + result.decode(
'utf-8'))
log(
"DIFFERENCE in " + imagefile)
def compose(refimagefile, imagefile, diffimagefile):
argv = [
"composite",
"-compose",
"difference",
refimagefile, imagefile, diffimagefile ]
subprocess.check_call(argv)
def compareImages(file):
allimages = [f
for f
in filelist(os.path.dirname(file),
".jpeg")
if f.startswith(file)]
# refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
# if f.startswith(file + ".reference")]
# log("compareImages: allimages:" + str(allimages))
(refimages, images) = partition(sorted(allimages),
lambda f: f.startswith(file +
".pdf.reference"))
# log("compareImages: images" + str(images))
for (image, refimage)
in zip(images, refimages):
compose(image, refimage, image +
".diff")
identify(image +
".diff")
if (len(images) != len(refimages)):
log(
"DIFFERENT NUMBER OF IMAGES FOR: " + file)
def compareAllImages(dirs, suffix):
log(
"compareAllImages...")
for dir
in dirs:
files = filelist(dir, suffix)
# log("compareAllImages:" + str(files))
for f
in files:
compareImages(f)
log(
"...compareAllImages done")
def parseArgs(argv):
(optlist,args) = getopt.getopt(argv[1:],
"hr",
[
"help",
"soffice=",
"userdir=",
"reference",
"valgrind"])
# print optlist
return (dict(optlist), args)
def usage():
message =
"""usage: {program} [option]... [directory]..."
-h | --help: print usage information
-r | --reference: generate new reference files (otherwise: compare)
--soffice=method:location
specify soffice instance to connect to
supported methods:
'path',
'connect'
--userdir=URL specify user installation directory
for 'path' method
--valgrind
pass --valgrind to soffice
for 'path' method
"""
print(message.format(program = os.path.basename(sys.argv[0])))
def checkTools():
try:
subprocess.check_output([
"gs",
"--version"])
except Exception:
print(
"Cannot execute 'gs'. Please install ghostscript.")
sys.exit(1)
try:
subprocess.check_output([
"composite",
"-version"])
subprocess.check_output([
"identify",
"-version"])
except Exception:
print(
"Cannot execute 'composite' or 'identify'.")
print(
"Please install ImageMagick.")
sys.exit(1)
if __name__ ==
"__main__":
checkTools()
(opts,args) = parseArgs(sys.argv)
if len(args) == 0:
usage()
sys.exit(1)
if "-h" in opts
or "--help" in opts:
usage()
sys.exit()
elif "--soffice" in opts:
reference =
"-r" in opts
or "--reference" in opts
failed = runLoadPrintFileTests(opts, args,
".odt", reference)
mkAllImages(args,
".odt",
"200", reference, failed)
if not(reference):
compareAllImages(args,
".odt")
else:
usage()
sys.exit(1)
# vim: set shiftwidth=4 softtabstop=4 expandtab: