tdf#131098 docx export: write fill property of graphic
[LibreOffice.git] / bin / convwatch.py
blob93082186f31958d9b958eaf9d843a41764e2a8c1
1 # -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
3 # This file is part of the LibreOffice project.
5 # This Source Code Form is subject to the terms of the Mozilla Public
6 # License, v. 2.0. If a copy of the MPL was not distributed with this
7 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 # Conversion watch, initially intended to detect if document layout changed since the last time it was run.
11 # Print a set of docs, compare the pdf against the old run and highlight the differences
14 import getopt
15 import os
16 import subprocess
17 import sys
18 import time
19 import uuid
20 import datetime
21 import traceback
22 import threading
23 try:
24 from urllib.parse import quote
25 except ImportError:
26 from urllib import quote
28 try:
29 import pyuno
30 import uno
31 import unohelper
32 except ImportError:
33 print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
34 print("PYTHONPATH=/installation/opt/program")
35 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
36 raise
38 try:
39 from com.sun.star.document import XDocumentEventListener
40 except ImportError:
41 print("UNO API class not found: try to set URE_BOOTSTRAP variable")
42 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
43 raise
45 ### utilities ###
47 def log(*args):
48 print(*args, flush=True)
50 def partition(list, pred):
51 left = []
52 right = []
53 for e in list:
54 if pred(e):
55 left.append(e)
56 else:
57 right.append(e)
58 return (left, right)
60 def filelist(dir, suffix):
61 if len(dir) == 0:
62 raise Exception("filelist: empty directory")
63 if not(dir[-1] == "/"):
64 dir += "/"
65 files = [dir + f for f in os.listdir(dir)]
66 # log(files)
67 return [f for f in files
68 if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]
70 def getFiles(dirs, suffix):
71 files = []
72 for dir in dirs:
73 files += filelist(dir, suffix)
74 return files
76 ### UNO utilities ###
78 class OfficeConnection:
79 def __init__(self, args):
80 self.args = args
81 self.soffice = None
82 self.socket = None
83 self.xContext = None
84 def setUp(self):
85 (method, sep, rest) = self.args["--soffice"].partition(":")
86 if sep != ":":
87 raise Exception("soffice parameter does not specify method")
88 if method == "path":
89 self.socket = "pipe,name=pytest" + str(uuid.uuid1())
90 try:
91 userdir = self.args["--userdir"]
92 except KeyError:
93 raise Exception("'path' method requires --userdir")
94 if not(userdir.startswith("file://")):
95 raise Exception("--userdir must be file URL")
96 self.soffice = self.bootstrap(rest, userdir, self.socket)
97 elif method == "connect":
98 self.socket = rest
99 else:
100 raise Exception("unsupported connection method: " + method)
101 self.xContext = self.connect(self.socket)
103 def bootstrap(self, soffice, userdir, socket):
104 argv = [ soffice, "--accept=" + socket + ";urp",
105 "-env:UserInstallation=" + userdir,
106 "--quickstart=no",
107 "--norestore", "--nologo", "--headless" ]
108 if "--valgrind" in self.args:
109 argv.append("--valgrind")
110 return subprocess.Popen(argv)
112 def connect(self, socket):
113 xLocalContext = uno.getComponentContext()
114 xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
115 "com.sun.star.bridge.UnoUrlResolver", xLocalContext)
116 url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
117 log("OfficeConnection: connecting to: " + url)
118 while True:
119 try:
120 xContext = xUnoResolver.resolve(url)
121 return xContext
122 # except com.sun.star.connection.NoConnectException
123 except pyuno.getClass("com.sun.star.connection.NoConnectException"):
124 log("NoConnectException: sleeping...")
125 time.sleep(1)
127 def tearDown(self):
128 if self.soffice:
129 if self.xContext:
130 try:
131 log("tearDown: calling terminate()...")
132 xMgr = self.xContext.ServiceManager
133 xDesktop = xMgr.createInstanceWithContext(
134 "com.sun.star.frame.Desktop", self.xContext)
135 xDesktop.terminate()
136 log("...done")
137 # except com.sun.star.lang.DisposedException:
138 except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
139 log("caught UnknownPropertyException")
140 pass # ignore, also means disposed
141 except pyuno.getClass("com.sun.star.lang.DisposedException"):
142 log("caught DisposedException")
143 pass # ignore
144 else:
145 self.soffice.terminate()
146 ret = self.soffice.wait()
147 self.xContext = None
148 self.socket = None
149 self.soffice = None
150 if ret != 0:
151 raise Exception("Exit status indicates failure: " + str(ret))
152 # return ret
154 class WatchDog(threading.Thread):
155 def __init__(self, connection):
156 threading.Thread.__init__(self, name="WatchDog " + connection.socket)
157 self.connection = connection
158 def run(self):
159 try:
160 if self.connection.soffice: # not possible for "connect"
161 self.connection.soffice.wait(timeout=120) # 2 minutes?
162 except subprocess.TimeoutExpired:
163 log("WatchDog: TIMEOUT -> killing soffice")
164 self.connection.soffice.terminate() # actually killing oosplash...
165 self.connection.xContext = None
166 log("WatchDog: killed soffice")
168 class PerTestConnection:
169 def __init__(self, args):
170 self.args = args
171 self.connection = None
172 self.watchdog = None
173 def getContext(self):
174 return self.connection.xContext
175 def setUp(self):
176 assert(not(self.connection))
177 def preTest(self):
178 conn = OfficeConnection(self.args)
179 conn.setUp()
180 self.connection = conn
181 self.watchdog = WatchDog(self.connection)
182 self.watchdog.start()
183 def postTest(self):
184 if self.connection:
185 try:
186 self.connection.tearDown()
187 finally:
188 self.connection = None
189 self.watchdog.join()
190 def tearDown(self):
191 assert(not(self.connection))
193 class PersistentConnection:
194 def __init__(self, args):
195 self.args = args
196 self.connection = None
197 def getContext(self):
198 return self.connection.xContext
199 def setUp(self):
200 conn = OfficeConnection(self.args)
201 conn.setUp()
202 self.connection = conn
203 def preTest(self):
204 assert(self.connection)
205 def postTest(self):
206 assert(self.connection)
207 def tearDown(self):
208 if self.connection:
209 try:
210 self.connection.tearDown()
211 finally:
212 self.connection = None
214 def simpleInvoke(connection, test):
215 try:
216 connection.preTest()
217 test.run(connection.getContext())
218 finally:
219 connection.postTest()
221 def retryInvoke(connection, test):
222 tries = 5
223 while tries > 0:
224 try:
225 tries -= 1
226 try:
227 connection.preTest()
228 test.run(connection.getContext())
229 return
230 finally:
231 connection.postTest()
232 except KeyboardInterrupt:
233 raise # Ctrl+C should work
234 except:
235 log("retryInvoke: caught exception")
236 raise Exception("FAILED retryInvoke")
238 def runConnectionTests(connection, invoker, tests):
239 try:
240 connection.setUp()
241 failed = []
242 for test in tests:
243 try:
244 invoker(connection, test)
245 except KeyboardInterrupt:
246 raise # Ctrl+C should work
247 except:
248 failed.append(test.file)
249 estr = traceback.format_exc()
250 log("... FAILED with exception:\n" + estr)
251 return failed
252 finally:
253 connection.tearDown()
255 class EventListener(XDocumentEventListener,unohelper.Base):
256 def __init__(self):
257 self.layoutFinished = False
258 def documentEventOccured(self, event):
259 # log(str(event.EventName))
260 if event.EventName == "OnLayoutFinished":
261 self.layoutFinished = True
262 def disposing(event):
263 pass
265 def mkPropertyValue(name, value):
266 return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
267 name, 0, value, 0)
269 ### tests ###
271 def loadFromURL(xContext, url):
272 xDesktop = xContext.ServiceManager.createInstanceWithContext(
273 "com.sun.star.frame.Desktop", xContext)
274 props = [("Hidden", True), ("ReadOnly", True)] # FilterName?
275 loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
276 xListener = EventListener()
277 xGEB = xContext.getValueByName(
278 "/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
279 xGEB.addDocumentEventListener(xListener)
280 xDoc = None
281 try:
282 xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
283 log("...loadComponentFromURL done")
284 if xDoc is None:
285 raise Exception("No document loaded?")
286 time_ = 0
287 while time_ < 30:
288 if xListener.layoutFinished:
289 return xDoc
290 log("delaying...")
291 time_ += 1
292 time.sleep(1)
293 log("timeout: no OnLayoutFinished received")
294 return xDoc
295 except:
296 if xDoc:
297 log("CLOSING")
298 xDoc.close(True)
299 raise
300 finally:
301 if xListener:
302 xGEB.removeDocumentEventListener(xListener)
304 def printDoc(xContext, xDoc, url):
305 props = [ mkPropertyValue("FileName", url) ]
306 # xDoc.print(props)
307 uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
308 busy = True
309 while busy:
310 log("printing...")
311 time.sleep(1)
312 prt = xDoc.getPrinter()
313 for value in prt:
314 if value.Name == "IsBusy":
315 busy = value.Value
316 log("...done printing")
318 class LoadPrintFileTest:
319 def __init__(self, file, prtsuffix):
320 self.file = file
321 self.prtsuffix = prtsuffix
322 def run(self, xContext):
323 start = datetime.datetime.now()
324 log("Time: " + str(start) + " Loading document: " + self.file)
325 xDoc = None
326 try:
327 if os.name == 'nt' and self.file[1] == ':':
328 url = "file:///" + self.file[0:2] + quote(self.file[2:])
329 else:
330 url = "file://" + quote(self.file)
331 xDoc = loadFromURL(xContext, url)
332 log("loadFromURL in: " + str(datetime.datetime.now() - start))
333 printDoc(xContext, xDoc, url + self.prtsuffix)
334 finally:
335 if xDoc:
336 xDoc.close(True)
337 end = datetime.datetime.now()
338 log("...done with: " + self.file + " in: " + str(end - start))
340 def runLoadPrintFileTests(opts, dirs, suffix, reference):
341 if reference:
342 prtsuffix = ".pdf.reference"
343 else:
344 prtsuffix = ".pdf"
345 files = getFiles(dirs, suffix)
346 tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
347 # connection = PersistentConnection(opts)
348 connection = PerTestConnection(opts)
349 failed = runConnectionTests(connection, simpleInvoke, tests)
350 print("all printed: FAILURES: " + str(len(failed)))
351 for fail in failed:
352 print(fail)
353 return failed
355 def mkImages(file, resolution):
356 argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
357 "-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
358 ret = subprocess.check_call(argv)
360 def mkAllImages(dirs, suffix, resolution, reference, failed):
361 if reference:
362 prtsuffix = ".pdf.reference"
363 else:
364 prtsuffix = ".pdf"
365 for dir in dirs:
366 files = filelist(dir, suffix)
367 log(files)
368 for f in files:
369 if f in failed:
370 log("Skipping failed: " + f)
371 else:
372 mkImages(f + prtsuffix, resolution)
374 def identify(imagefile):
375 argv = ["identify", "-format", "%k", imagefile]
376 process = subprocess.Popen(argv, stdout=subprocess.PIPE)
377 result, _ = process.communicate()
378 if process.wait() != 0:
379 raise Exception("identify failed")
380 if result.partition(b"\n")[0] != b"1":
381 log("identify result: " + result.decode('utf-8'))
382 log("DIFFERENCE in " + imagefile)
384 def compose(refimagefile, imagefile, diffimagefile):
385 argv = [ "composite", "-compose", "difference",
386 refimagefile, imagefile, diffimagefile ]
387 subprocess.check_call(argv)
389 def compareImages(file):
390 allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
391 if f.startswith(file)]
392 # refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
393 # if f.startswith(file + ".reference")]
394 # log("compareImages: allimages:" + str(allimages))
395 (refimages, images) = partition(sorted(allimages),
396 lambda f: f.startswith(file + ".pdf.reference"))
397 # log("compareImages: images" + str(images))
398 for (image, refimage) in zip(images, refimages):
399 compose(image, refimage, image + ".diff")
400 identify(image + ".diff")
401 if (len(images) != len(refimages)):
402 log("DIFFERENT NUMBER OF IMAGES FOR: " + file)
404 def compareAllImages(dirs, suffix):
405 log("compareAllImages...")
406 for dir in dirs:
407 files = filelist(dir, suffix)
408 # log("compareAllImages:" + str(files))
409 for f in files:
410 compareImages(f)
411 log("...compareAllImages done")
414 def parseArgs(argv):
415 (optlist,args) = getopt.getopt(argv[1:], "hr",
416 ["help", "soffice=", "userdir=", "reference", "valgrind"])
417 # print optlist
418 return (dict(optlist), args)
420 def usage():
421 message = """usage: {program} [option]... [directory]..."
422 -h | --help: print usage information
423 -r | --reference: generate new reference files (otherwise: compare)
424 --soffice=method:location
425 specify soffice instance to connect to
426 supported methods: 'path', 'connect'
427 --userdir=URL specify user installation directory for 'path' method
428 --valgrind pass --valgrind to soffice for 'path' method"""
429 print(message.format(program = os.path.basename(sys.argv[0])))
431 def checkTools():
432 try:
433 subprocess.check_output(["gs", "--version"])
434 except:
435 print("Cannot execute 'gs'. Please install ghostscript.")
436 sys.exit(1)
437 try:
438 subprocess.check_output(["composite", "-version"])
439 subprocess.check_output(["identify", "-version"])
440 except:
441 print("Cannot execute 'composite' or 'identify'.")
442 print("Please install ImageMagick.")
443 sys.exit(1)
445 if __name__ == "__main__":
446 checkTools()
447 (opts,args) = parseArgs(sys.argv)
448 if len(args) == 0:
449 usage()
450 sys.exit(1)
451 if "-h" in opts or "--help" in opts:
452 usage()
453 sys.exit()
454 elif "--soffice" in opts:
455 reference = "-r" in opts or "--reference" in opts
456 failed = runLoadPrintFileTests(opts, args, ".odt", reference)
457 mkAllImages(args, ".odt", "200", reference, failed)
458 if not(reference):
459 compareAllImages(args, ".odt")
460 else:
461 usage()
462 sys.exit(1)
464 # vim: set shiftwidth=4 softtabstop=4 expandtab: