Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  convwatch.py   Sprache: Python

 
# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
#
# This file is part of the LibreOffice project.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Conversion watch, initially intended to detect if document layout changed since the last time it was run.
#
# Print a set of docs, compare the pdf against the old run and highlight the differences
#

import getopt
import os
import subprocess
import sys
import time
import uuid
import datetime
import traceback
import threading
try:
    from urllib.parse import quote
except ImportError:
    from urllib import quote

try:
    import pyuno
    import uno
    import unohelper
except ImportError:
    print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
    print("PYTHONPATH=/installation/opt/program")
    print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
    raise

try:
    from com.sun.star.document import XDocumentEventListener
except ImportError:
    print("UNO API class not found: try to set URE_BOOTSTRAP variable")
    print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
    raise

### utilities ###

def log(*args):
    print(*args, flush=True)

def partition(list, pred):
    left = []
    right = []
    for e in list:
        if pred(e):
            left.append(e)
        else:
            right.append(e)
    return (left, right)

def filelist(dir, suffix):
    if len(dir) == 0:
        raise Exception("filelist: empty directory")
    if not(dir[-1] == "/"):
        dir += "/"
    files = [dir + f for f in os.listdir(dir)]
#    log(files)
    return [f for f in files
                    if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]

def getFiles(dirs, suffix):
    files = []
    for dir in dirs:
        files += filelist(dir, suffix)
    return files

### UNO utilities ###

class OfficeConnection:
    def __init__(self, args):
        self.args = args
        self.soffice = None
        self.socket = None
        self.xContext = None
    def setUp(self):
        (method, sep, rest) = self.args["--soffice"].partition(":")
        if sep != ":":
            raise Exception("soffice parameter does not specify method")
        if method == "path":
                self.socket = "pipe,name=pytest" + str(uuid.uuid1())
                try:
                    userdir = self.args["--userdir"]
                except KeyError:
                    raise Exception("'path' method requires --userdir")
                if not(userdir.startswith("file://")):
                    raise Exception("--userdir must be file URL")
                self.soffice = self.bootstrap(rest, userdir, self.socket)
        elif method == "connect":
                self.socket = rest
        else:
            raise Exception("unsupported connection method: " + method)
        self.xContext = self.connect(self.socket)

    def bootstrap(self, soffice, userdir, socket):
        argv = [ soffice, "--accept=" + socket + ";urp",
                "-env:UserInstallation=" + userdir,
                "--quickstart=no",
                "--norestore""--nologo""--headless" ]
        if "--valgrind" in self.args:
            argv.append("--valgrind")
        return subprocess.Popen(argv)

    def connect(self, socket):
        xLocalContext = uno.getComponentContext()
        xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
                "com.sun.star.bridge.UnoUrlResolver", xLocalContext)
        url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
        log("OfficeConnection: connecting to: " + url)
        while True:
            try:
                xContext = xUnoResolver.resolve(url)
                return xContext
#            except com.sun.star.connection.NoConnectException
            except pyuno.getClass("com.sun.star.connection.NoConnectException"):
                log("NoConnectException: sleeping...")
                time.sleep(1)

    def tearDown(self):
        if self.soffice:
            if self.xContext:
                try:
                    log("tearDown: calling terminate()...")
                    xMgr = self.xContext.ServiceManager
                    xDesktop = xMgr.createInstanceWithContext(
                            "com.sun.star.frame.Desktop", self.xContext)
                    xDesktop.terminate()
                    log("...done")
#                except com.sun.star.lang.DisposedException:
                except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
                    log("caught UnknownPropertyException")
                    pass # ignore, also means disposed
                except pyuno.getClass("com.sun.star.lang.DisposedException"):
                    log("caught DisposedException")
                    pass # ignore
            else:
                self.soffice.terminate()
            ret = self.soffice.wait()
            self.xContext = None
            self.socket = None
            self.soffice = None
            if ret != 0:
                raise Exception("Exit status indicates failure: " + str(ret))
#            return ret

class WatchDog(threading.Thread):
    def __init__(self, connection):
        threading.Thread.__init__(self, name="WatchDog " + connection.socket)
        self.connection = connection
    def run(self):
        try:
            if self.connection.soffice: # not possible for "connect"
                self.connection.soffice.wait(timeout=120) # 2 minutes?
        except subprocess.TimeoutExpired:
            log("WatchDog: TIMEOUT -> killing soffice")
            self.connection.soffice.terminate() # actually killing oosplash...
            self.connection.xContext = None
            log("WatchDog: killed soffice")

class PerTestConnection:
    def __init__(self, args):
        self.args = args
        self.connection = None
        self.watchdog = None
    def getContext(self):
        return self.connection.xContext
    def setUp(self):
        assert(not(self.connection))
    def preTest(self):
        conn = OfficeConnection(self.args)
        conn.setUp()
        self.connection = conn
        self.watchdog = WatchDog(self.connection)
        self.watchdog.start()
    def postTest(self):
        if self.connection:
            try:
                self.connection.tearDown()
            finally:
                self.connection = None
                self.watchdog.join()
    def tearDown(self):
        assert(not(self.connection))

class PersistentConnection:
    def __init__(self, args):
        self.args = args
        self.connection = None
    def getContext(self):
        return self.connection.xContext
    def setUp(self):
        conn = OfficeConnection(self.args)
        conn.setUp()
        self.connection = conn
    def preTest(self):
        assert(self.connection)
    def postTest(self):
        assert(self.connection)
    def tearDown(self):
        if self.connection:
            try:
                self.connection.tearDown()
            finally:
                self.connection = None

def simpleInvoke(connection, test):
    try:
        connection.preTest()
        test.run(connection.getContext())
    finally:
        connection.postTest()

def retryInvoke(connection, test):
    tries = 5
    while tries > 0:
        try:
            tries -= 1
            try:
                connection.preTest()
                test.run(connection.getContext())
                return
            finally:
                connection.postTest()
        except KeyboardInterrupt:
            raise # Ctrl+C should work
        except Exception:
            log("retryInvoke: caught exception")
    raise Exception("FAILED retryInvoke")

def runConnectionTests(connection, invoker, tests):
    try:
        connection.setUp()
        failed = []
        for test in tests:
            try:
                invoker(connection, test)
            except KeyboardInterrupt:
                raise # Ctrl+C should work
            except Exception:
                failed.append(test.file)
                estr = traceback.format_exc()
                log("... FAILED with exception:\n" + estr)
        return failed
    finally:
        connection.tearDown()

class EventListener(XDocumentEventListener,unohelper.Base):
    def __init__(self):
        self.layoutFinished = False
    def documentEventOccured(self, event):
#        log(str(event.EventName))
        if event.EventName == "OnLayoutFinished":
            self.layoutFinished = True
    def disposing(event):
        pass

def mkPropertyValue(name, value):
    return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
            name, 0, value, 0)

### tests ###

def loadFromURL(xContext, url):
    xDesktop = xContext.ServiceManager.createInstanceWithContext(
            "com.sun.star.frame.Desktop", xContext)
    props = [("Hidden"True), ("ReadOnly"True)] # FilterName?
    loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
    xListener = EventListener()
    xGEB = xContext.getValueByName(
        "/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
    xGEB.addDocumentEventListener(xListener)
    xDoc = None
    try:
        xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
        log("...loadComponentFromURL done")
        if xDoc is None:
            raise Exception("No document loaded?")
        time_ = 0
        while time_ < 30:
            if xListener.layoutFinished:
                return xDoc
            log("delaying...")
            time_ += 1
            time.sleep(1)
        log("timeout: no OnLayoutFinished received")
        return xDoc
    except Exception:
        if xDoc:
            log("CLOSING")
            xDoc.close(True)
        raise
    finally:
        if xListener:
            xGEB.removeDocumentEventListener(xListener)

def printDoc(xContext, xDoc, url):
    props = [ mkPropertyValue("FileName", url) ]
# xDoc.print(props)
    uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
    busy = True
    while busy:
        log("printing...")
        time.sleep(1)
        prt = xDoc.getPrinter()
        for value in prt:
            if value.Name == "IsBusy":
                busy = value.Value
    log("...done printing")

class LoadPrintFileTest:
    def __init__(self, file, prtsuffix):
        self.file = file
        self.prtsuffix = prtsuffix
    def run(self, xContext):
        start = datetime.datetime.now()
        log("Time: " + str(start) + " Loading document: " + self.file)
        xDoc = None
        try:
            if os.name == 'nt' and self.file[1] == ':':
                url = "file:///" + self.file[0:2] + quote(self.file[2:])
            else:
                url = "file://" + quote(self.file)
            xDoc = loadFromURL(xContext, url)
            log("loadFromURL in: "  + str(datetime.datetime.now() - start))
            printDoc(xContext, xDoc, url + self.prtsuffix)
        finally:
            if xDoc:
                xDoc.close(True)
            end = datetime.datetime.now()
            log("...done with: " + self.file + " in: " + str(end - start))

def runLoadPrintFileTests(opts, dirs, suffix, reference):
    if reference:
        prtsuffix = ".pdf.reference"
    else:
        prtsuffix = ".pdf"
    files = getFiles(dirs, suffix)
    tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
#    connection = PersistentConnection(opts)
    connection = PerTestConnection(opts)
    failed = runConnectionTests(connection, simpleInvoke, tests)
    print("all printed: FAILURES: " + str(len(failed)))
    for fail in failed:
        print(fail)
    return failed

def mkImages(file, resolution):
    argv = [ "gs""-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
             "-dNOPROMPT""-dNOPAUSE""-dBATCH""-sDEVICE=jpeg", file ]
    subprocess.check_call(argv)

def mkAllImages(dirs, suffix, resolution, reference, failed):
    if reference:
        prtsuffix = ".pdf.reference"
    else:
        prtsuffix = ".pdf"
    for dir in dirs:
        files = filelist(dir, suffix)
        log(files)
        for f in files:
            if f in failed:
                log("Skipping failed: " + f)
            else:
                mkImages(f + prtsuffix, resolution)

def identify(imagefile):
    argv = ["identify""-format""%k", imagefile]
    process = subprocess.Popen(argv, stdout=subprocess.PIPE)
    result, _ = process.communicate()
    if process.wait() != 0:
        raise Exception("identify failed")
    if result.partition(b"\n")[0] != b"1":
        log("identify result: " + result.decode('utf-8'))
        log("DIFFERENCE in " + imagefile)

def compose(refimagefile, imagefile, diffimagefile):
    argv = [ "composite""-compose""difference",
            refimagefile, imagefile, diffimagefile ]
    subprocess.check_call(argv)

def compareImages(file):
    allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
                   if f.startswith(file)]
#    refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
#                   if f.startswith(file + ".reference")]
#    log("compareImages: allimages:" + str(allimages))
    (refimages, images) = partition(sorted(allimages),
            lambda f: f.startswith(file + ".pdf.reference"))
#    log("compareImages: images" + str(images))
    for (image, refimage) in zip(images, refimages):
        compose(image, refimage, image + ".diff")
        identify(image + ".diff")
    if (len(images) != len(refimages)):
        log("DIFFERENT NUMBER OF IMAGES FOR: " + file)

def compareAllImages(dirs, suffix):
    log("compareAllImages...")
    for dir in dirs:
        files = filelist(dir, suffix)
#        log("compareAllImages:" + str(files))
        for f in files:
            compareImages(f)
    log("...compareAllImages done")


def parseArgs(argv):
    (optlist,args) = getopt.getopt(argv[1:], "hr",
            ["help""soffice=""userdir=""reference""valgrind"])
#    print optlist
    return (dict(optlist), args)

def usage():
    message = """usage: {program} [option]... [directory]..."
 -h | --help:      print usage information
 -r | --reference: generate new reference files (otherwise: compare)
 --soffice=method:location
                   specify soffice instance to connect to
                   supported methods: 'path''connect'
 --userdir=URL     specify user installation directory for 'path' method
 --valgrind        pass --valgrind to soffice for 'path' method"""
    print(message.format(program = os.path.basename(sys.argv[0])))

def checkTools():
    try:
        subprocess.check_output(["gs""--version"])
    except Exception:
        print("Cannot execute 'gs'. Please install ghostscript.")
        sys.exit(1)
    try:
        subprocess.check_output(["composite""-version"])
        subprocess.check_output(["identify""-version"])
    except Exception:
        print("Cannot execute 'composite' or 'identify'.")
        print("Please install ImageMagick.")
        sys.exit(1)

if __name__ == "__main__":
    checkTools()
    (opts,args) = parseArgs(sys.argv)
    if len(args) == 0:
        usage()
        sys.exit(1)
    if "-h" in opts or "--help" in opts:
        usage()
        sys.exit()
    elif "--soffice" in opts:
        reference = "-r" in opts or "--reference" in opts
        failed = runLoadPrintFileTests(opts, args, ".odt", reference)
        mkAllImages(args, ".odt""200", reference, failed)
        if not(reference):
            compareAllImages(args, ".odt")
    else:
        usage()
        sys.exit(1)

# vim: set shiftwidth=4 softtabstop=4 expandtab:

85%


¤ Dauer der Verarbeitung: 0.16 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge