Version 4.0.0.1, tag libreoffice-4.0.0.1
[LibreOffice.git] / bin / convwatch.py
blobf3fa55ad8cf91b69eb79819c6ea576bd2502c928
1 # -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
2 # Version: MPL 1.1 / GPLv3+ / LGPLv3+
4 # The contents of this file are subject to the Mozilla Public License Version
5 # 1.1 (the "License"); you may not use this file except in compliance with
6 # the License or as specified alternatively below. You may obtain a copy of
7 # the License at http://www.mozilla.org/MPL/
9 # Software distributed under the License is distributed on an "AS IS" basis,
10 # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 # for the specific language governing rights and limitations under the
12 # License.
14 # Major Contributor(s):
15 # Copyright (C) 2012 Red Hat, Inc., Michael Stahl <mstahl@redhat.com>
16 # (initial developer)
18 # All Rights Reserved.
20 # For minor contributions see the git repository.
22 # Alternatively, the contents of this file may be used under the terms of
23 # either the GNU General Public License Version 3 or later (the "GPLv3+"), or
24 # the GNU Lesser General Public License Version 3 or later (the "LGPLv3+"),
25 # in which case the provisions of the GPLv3+ or the LGPLv3+ are applicable
26 # instead of those above.
28 import getopt
29 import os
30 import subprocess
31 import sys
32 import time
33 import uuid
34 try:
35 from urllib.parse import quote
36 except ImportError:
37 from urllib import quote
39 try:
40 import pyuno
41 import uno
42 import unohelper
43 except ImportError:
44 print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
45 print("PYTHONPATH=/installation/opt/program")
46 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
47 raise
49 try:
50 from com.sun.star.document import XDocumentEventListener
51 except ImportError:
52 print("UNO API class not found: try to set URE_BOOTSTRAP variable")
53 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
54 raise
56 ### utilities ###
58 def partition(list, pred):
59 left = []
60 right = []
61 for e in list:
62 if pred(e):
63 left.append(e)
64 else:
65 right.append(e)
66 return (left, right)
68 def filelist(dir, suffix):
69 if len(dir) == 0:
70 raise Exception("filelist: empty directory")
71 if not(dir[-1] == "/"):
72 dir += "/"
73 files = [dir + f for f in os.listdir(dir)]
74 # print(files)
75 return [f for f in files
76 if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]
78 def getFiles(dirs, suffix):
79 files = []
80 for dir in dirs:
81 files += filelist(dir, suffix)
82 return files
84 ### UNO utilities ###
86 class OfficeConnection:
87 def __init__(self, args):
88 self.args = args
89 self.soffice = None
90 self.socket = None
91 self.xContext = None
92 def setUp(self):
93 (method, sep, rest) = self.args["--soffice"].partition(":")
94 if sep != ":":
95 raise Exception("soffice parameter does not specify method")
96 if method == "path":
97 socket = "pipe,name=pytest" + str(uuid.uuid1())
98 try:
99 userdir = self.args["--userdir"]
100 except KeyError:
101 raise Exception("'path' method requires --userdir")
102 if not(userdir.startswith("file://")):
103 raise Exception("--userdir must be file URL")
104 self.soffice = self.bootstrap(rest, userdir, socket)
105 elif method == "connect":
106 socket = rest
107 else:
108 raise Exception("unsupported connection method: " + method)
109 self.xContext = self.connect(socket)
111 def bootstrap(self, soffice, userdir, socket):
112 argv = [ soffice, "--accept=" + socket + ";urp",
113 "-env:UserInstallation=" + userdir,
114 "--quickstart=no", "--nofirststartwizard",
115 "--norestore", "--nologo", "--headless" ]
116 if "--valgrind" in self.args:
117 argv.append("--valgrind")
118 return subprocess.Popen(argv)
120 def connect(self, socket):
121 xLocalContext = uno.getComponentContext()
122 xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
123 "com.sun.star.bridge.UnoUrlResolver", xLocalContext)
124 url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
125 print("OfficeConnection: connecting to: " + url)
126 while True:
127 try:
128 xContext = xUnoResolver.resolve(url)
129 return xContext
130 # except com.sun.star.connection.NoConnectException
131 except pyuno.getClass("com.sun.star.connection.NoConnectException"):
132 print("NoConnectException: sleeping...")
133 time.sleep(1)
135 def tearDown(self):
136 if self.soffice:
137 if self.xContext:
138 try:
139 print("tearDown: calling terminate()...")
140 xMgr = self.xContext.ServiceManager
141 xDesktop = xMgr.createInstanceWithContext(
142 "com.sun.star.frame.Desktop", self.xContext)
143 xDesktop.terminate()
144 print("...done")
145 # except com.sun.star.lang.DisposedException:
146 except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
147 print("caught UnknownPropertyException")
148 pass # ignore, also means disposed
149 except pyuno.getClass("com.sun.star.lang.DisposedException"):
150 print("caught DisposedException")
151 pass # ignore
152 else:
153 self.soffice.terminate()
154 ret = self.soffice.wait()
155 self.xContext = None
156 self.socket = None
157 self.soffice = None
158 if ret != 0:
159 raise Exception("Exit status indicates failure: " + str(ret))
160 # return ret
162 class PerTestConnection:
163 def __init__(self, args):
164 self.args = args
165 self.connection = None
166 def getContext(self):
167 return self.connection.xContext
168 def setUp(self):
169 assert(not(self.connection))
170 def preTest(self):
171 conn = OfficeConnection(self.args)
172 conn.setUp()
173 self.connection = conn
174 def postTest(self):
175 if self.connection:
176 try:
177 self.connection.tearDown()
178 finally:
179 self.connection = None
180 def tearDown(self):
181 assert(not(self.connection))
183 class PersistentConnection:
184 def __init__(self, args):
185 self.args = args
186 self.connection = None
187 def getContext(self):
188 return self.connection.xContext
189 def setUp(self):
190 conn = OfficeConnection(self.args)
191 conn.setUp()
192 self.connection = conn
193 def preTest(self):
194 assert(self.connection)
195 def postTest(self):
196 assert(self.connection)
197 def tearDown(self):
198 if self.connection:
199 try:
200 self.connection.tearDown()
201 finally:
202 self.connection = None
204 def simpleInvoke(connection, test):
205 try:
206 connection.preTest()
207 test.run(connection.getContext())
208 finally:
209 connection.postTest()
211 def retryInvoke(connection, test):
212 tries = 5
213 while tries > 0:
214 try:
215 tries -= 1
216 try:
217 connection.preTest()
218 test.run(connection.getContext())
219 return
220 finally:
221 connection.postTest()
222 except KeyboardInterrupt:
223 raise # Ctrl+C should work
224 except:
225 print("retryInvoke: caught exception")
226 raise Exception("FAILED retryInvoke")
228 def runConnectionTests(connection, invoker, tests):
229 try:
230 connection.setUp()
231 for test in tests:
232 invoker(connection, test)
233 finally:
234 connection.tearDown()
236 class EventListener(XDocumentEventListener,unohelper.Base):
237 def __init__(self):
238 self.layoutFinished = False
239 def documentEventOccured(self, event):
240 # print(str(event.EventName))
241 if event.EventName == "OnLayoutFinished":
242 self.layoutFinished = True
243 def disposing(event):
244 pass
246 def mkPropertyValue(name, value):
247 return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
248 name, 0, value, 0)
250 ### tests ###
252 def loadFromURL(xContext, url):
253 xDesktop = xContext.ServiceManager.createInstanceWithContext(
254 "com.sun.star.frame.Desktop", xContext)
255 props = [("Hidden", True), ("ReadOnly", True)] # FilterName?
256 loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
257 xListener = EventListener()
258 xGEB = xContext.ServiceManager.createInstanceWithContext(
259 "com.sun.star.frame.GlobalEventBroadcaster", xContext)
260 xGEB.addDocumentEventListener(xListener)
261 try:
262 xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
263 time_ = 0
264 while time_ < 30:
265 if xListener.layoutFinished:
266 return xDoc
267 print("delaying...")
268 time_ += 1
269 time.sleep(1)
270 print("timeout: no OnLayoutFinished received")
271 return xDoc
272 except:
273 if xDoc:
274 print("CLOSING")
275 xDoc.close(True)
276 raise
277 finally:
278 if xListener:
279 xGEB.removeDocumentEventListener(xListener)
281 def printDoc(xContext, xDoc, url):
282 props = [ mkPropertyValue("FileName", url) ]
283 # xDoc.print(props)
284 uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
285 busy = True
286 while busy:
287 print("printing...")
288 time.sleep(1)
289 prt = xDoc.getPrinter()
290 for value in prt:
291 if value.Name == "IsBusy":
292 busy = value.Value
293 print("...done printing")
295 class LoadPrintFileTest:
296 def __init__(self, file, prtsuffix):
297 self.file = file
298 self.prtsuffix = prtsuffix
299 def run(self, xContext):
300 print("Loading document: " + self.file)
301 try:
302 url = "file://" + quote(self.file)
303 xDoc = loadFromURL(xContext, url)
304 printDoc(xContext, xDoc, url + self.prtsuffix)
305 finally:
306 if xDoc:
307 xDoc.close(True)
308 print("...done with: " + self.file)
310 def runLoadPrintFileTests(opts, dirs, suffix, reference):
311 if reference:
312 prtsuffix = ".pdf.reference"
313 else:
314 prtsuffix = ".pdf"
315 files = getFiles(dirs, suffix)
316 tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
317 connection = PersistentConnection(opts)
318 # connection = PerTestConnection(opts)
319 runConnectionTests(connection, simpleInvoke, tests)
321 def mkImages(file, resolution):
322 argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
323 "-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
324 ret = subprocess.check_call(argv)
326 def mkAllImages(dirs, suffix, resolution, reference):
327 if reference:
328 prtsuffix = ".pdf.reference"
329 else:
330 prtsuffix = ".pdf"
331 for dir in dirs:
332 files = filelist(dir, suffix)
333 print(files)
334 for f in files:
335 mkImages(f + prtsuffix, resolution)
337 def identify(imagefile):
338 argv = ["identify", "-format", "%k", imagefile]
339 process = subprocess.Popen(argv, stdout=subprocess.PIPE)
340 result, _ = process.communicate()
341 if process.wait() != 0:
342 raise Exception("identify failed")
343 if result.partition(b"\n")[0] != b"1":
344 print("identify result: " + result)
345 print("DIFFERENCE in " + imagefile)
347 def compose(refimagefile, imagefile, diffimagefile):
348 argv = [ "composite", "-compose", "difference",
349 refimagefile, imagefile, diffimagefile ]
350 subprocess.check_call(argv)
352 def compareImages(file):
353 allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
354 if f.startswith(file)]
355 # refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
356 # if f.startswith(file + ".reference")]
357 # print("compareImages: allimages:" + str(allimages))
358 (refimages, images) = partition(sorted(allimages),
359 lambda f: f.startswith(file + ".pdf.reference"))
360 # print("compareImages: images" + str(images))
361 for (image, refimage) in zip(images, refimages):
362 compose(image, refimage, image + ".diff")
363 identify(image + ".diff")
364 if (len(images) != len(refimages)):
365 print("DIFFERENT NUMBER OF IMAGES FOR: " + file)
367 def compareAllImages(dirs, suffix):
368 print("compareAllImages...")
369 for dir in dirs:
370 files = filelist(dir, suffix)
371 # print("compareAllImages:" + str(files))
372 for f in files:
373 compareImages(f)
374 print("...compareAllImages done")
377 def parseArgs(argv):
378 (optlist,args) = getopt.getopt(argv[1:], "hr",
379 ["help", "soffice=", "userdir=", "reference", "valgrind"])
380 # print optlist
381 return (dict(optlist), args)
383 def usage():
384 message = """usage: {program} [option]... [directory]..."
385 -h | --help: print usage information
386 -r | --reference: generate new reference files (otherwise: compare)
387 --soffice=method:location
388 specify soffice instance to connect to
389 supported methods: 'path', 'connect'
390 --userdir=URL specify user installation directory for 'path' method
391 --valgrind pass --valgrind to soffice for 'path' method"""
392 print(message.format(program = os.path.basename(sys.argv[0])))
394 def checkTools():
395 try:
396 subprocess.check_output(["gs", "--version"])
397 except:
398 print("Cannot execute 'gs'. Please install ghostscript.")
399 sys.exit(1)
400 try:
401 subprocess.check_output(["composite", "-version"])
402 subprocess.check_output(["identify", "-version"])
403 except:
404 print("Cannot execute 'composite' or 'identify'.")
405 print("Please install ImageMagick.")
406 sys.exit(1)
408 if __name__ == "__main__":
409 # checkTools()
410 (opts,args) = parseArgs(sys.argv)
411 if len(args) == 0:
412 usage()
413 sys.exit(1)
414 if "-h" in opts or "--help" in opts:
415 usage()
416 sys.exit()
417 elif "--soffice" in opts:
418 reference = "-r" in opts or "--reference" in opts
419 runLoadPrintFileTests(opts, args, ".odt", reference)
420 mkAllImages(args, ".odt", "200", reference)
421 if not(reference):
422 compareAllImages(args, ".odt")
423 else:
424 usage()
425 sys.exit(1)
427 # vim:set shiftwidth=4 softtabstop=4 expandtab: