1 # -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
3 # This file is part of the LibreOffice project.
5 # This Source Code Form is subject to the terms of the Mozilla Public
6 # License, v. 2.0. If a copy of the MPL was not distributed with this
7 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 # Conversion watch, initially intended to detect if document layout changed since the last time it was run.
11 # Print a set of docs, compare the pdf against the old run and highlight the differences
24 from urllib
.parse
import quote
26 from urllib
import quote
33 print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
34 print("PYTHONPATH=/installation/opt/program")
35 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
39 from com
.sun
.star
.document
import XDocumentEventListener
41 print("UNO API class not found: try to set URE_BOOTSTRAP variable")
42 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
48 print(*args
, flush
=True)
50 def partition(list, pred
):
60 def filelist(dir, suffix
):
62 raise Exception("filelist: empty directory")
63 if not(dir[-1] == "/"):
65 files
= [dir + f
for f
in os
.listdir(dir)]
67 return [f
for f
in files
68 if os
.path
.isfile(f
) and os
.path
.splitext(f
)[1] == suffix
]
70 def getFiles(dirs
, suffix
):
73 files
+= filelist(dir, suffix
)
78 class OfficeConnection
:
79 def __init__(self
, args
):
85 (method
, sep
, rest
) = self
.args
["--soffice"].partition(":")
87 raise Exception("soffice parameter does not specify method")
89 self
.socket
= "pipe,name=pytest" + str(uuid
.uuid1())
91 userdir
= self
.args
["--userdir"]
93 raise Exception("'path' method requires --userdir")
94 if not(userdir
.startswith("file://")):
95 raise Exception("--userdir must be file URL")
96 self
.soffice
= self
.bootstrap(rest
, userdir
, self
.socket
)
97 elif method
== "connect":
100 raise Exception("unsupported connection method: " + method
)
101 self
.xContext
= self
.connect(self
.socket
)
103 def bootstrap(self
, soffice
, userdir
, socket
):
104 argv
= [ soffice
, "--accept=" + socket
+ ";urp",
105 "-env:UserInstallation=" + userdir
,
107 "--norestore", "--nologo", "--headless" ]
108 if "--valgrind" in self
.args
:
109 argv
.append("--valgrind")
110 return subprocess
.Popen(argv
)
112 def connect(self
, socket
):
113 xLocalContext
= uno
.getComponentContext()
114 xUnoResolver
= xLocalContext
.ServiceManager
.createInstanceWithContext(
115 "com.sun.star.bridge.UnoUrlResolver", xLocalContext
)
116 url
= "uno:" + socket
+ ";urp;StarOffice.ComponentContext"
117 log("OfficeConnection: connecting to: " + url
)
120 xContext
= xUnoResolver
.resolve(url
)
122 # except com.sun.star.connection.NoConnectException
123 except pyuno
.getClass("com.sun.star.connection.NoConnectException"):
124 log("NoConnectException: sleeping...")
131 log("tearDown: calling terminate()...")
132 xMgr
= self
.xContext
.ServiceManager
133 xDesktop
= xMgr
.createInstanceWithContext(
134 "com.sun.star.frame.Desktop", self
.xContext
)
137 # except com.sun.star.lang.DisposedException:
138 except pyuno
.getClass("com.sun.star.beans.UnknownPropertyException"):
139 log("caught UnknownPropertyException")
140 pass # ignore, also means disposed
141 except pyuno
.getClass("com.sun.star.lang.DisposedException"):
142 log("caught DisposedException")
145 self
.soffice
.terminate()
146 ret
= self
.soffice
.wait()
151 raise Exception("Exit status indicates failure: " + str(ret
))
154 class WatchDog(threading
.Thread
):
155 def __init__(self
, connection
):
156 threading
.Thread
.__init
__(self
, name
="WatchDog " + connection
.socket
)
157 self
.connection
= connection
160 if self
.connection
.soffice
: # not possible for "connect"
161 self
.connection
.soffice
.wait(timeout
=120) # 2 minutes?
162 except subprocess
.TimeoutExpired
:
163 log("WatchDog: TIMEOUT -> killing soffice")
164 self
.connection
.soffice
.terminate() # actually killing oosplash...
165 self
.connection
.xContext
= None
166 log("WatchDog: killed soffice")
168 class PerTestConnection
:
169 def __init__(self
, args
):
171 self
.connection
= None
173 def getContext(self
):
174 return self
.connection
.xContext
176 assert(not(self
.connection
))
178 conn
= OfficeConnection(self
.args
)
180 self
.connection
= conn
181 self
.watchdog
= WatchDog(self
.connection
)
182 self
.watchdog
.start()
186 self
.connection
.tearDown()
188 self
.connection
= None
191 assert(not(self
.connection
))
193 class PersistentConnection
:
194 def __init__(self
, args
):
196 self
.connection
= None
197 def getContext(self
):
198 return self
.connection
.xContext
200 conn
= OfficeConnection(self
.args
)
202 self
.connection
= conn
204 assert(self
.connection
)
206 assert(self
.connection
)
210 self
.connection
.tearDown()
212 self
.connection
= None
214 def simpleInvoke(connection
, test
):
217 test
.run(connection
.getContext())
219 connection
.postTest()
221 def retryInvoke(connection
, test
):
228 test
.run(connection
.getContext())
231 connection
.postTest()
232 except KeyboardInterrupt:
233 raise # Ctrl+C should work
235 log("retryInvoke: caught exception")
236 raise Exception("FAILED retryInvoke")
238 def runConnectionTests(connection
, invoker
, tests
):
244 invoker(connection
, test
)
245 except KeyboardInterrupt:
246 raise # Ctrl+C should work
248 failed
.append(test
.file)
249 estr
= traceback
.format_exc()
250 log("... FAILED with exception:\n" + estr
)
253 connection
.tearDown()
255 class EventListener(XDocumentEventListener
,unohelper
.Base
):
257 self
.layoutFinished
= False
258 def documentEventOccured(self
, event
):
259 # log(str(event.EventName))
260 if event
.EventName
== "OnLayoutFinished":
261 self
.layoutFinished
= True
262 def disposing(event
):
265 def mkPropertyValue(name
, value
):
266 return uno
.createUnoStruct("com.sun.star.beans.PropertyValue",
271 def loadFromURL(xContext
, url
):
272 xDesktop
= xContext
.ServiceManager
.createInstanceWithContext(
273 "com.sun.star.frame.Desktop", xContext
)
274 props
= [("Hidden", True), ("ReadOnly", True)] # FilterName?
275 loadProps
= tuple([mkPropertyValue(name
, value
) for (name
, value
) in props
])
276 xListener
= EventListener()
277 xGEB
= xContext
.getValueByName(
278 "/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
279 xGEB
.addDocumentEventListener(xListener
)
282 xDoc
= xDesktop
.loadComponentFromURL(url
, "_blank", 0, loadProps
)
283 log("...loadComponentFromURL done")
285 raise Exception("No document loaded?")
288 if xListener
.layoutFinished
:
293 log("timeout: no OnLayoutFinished received")
302 xGEB
.removeDocumentEventListener(xListener
)
304 def printDoc(xContext
, xDoc
, url
):
305 props
= [ mkPropertyValue("FileName", url
) ]
307 uno
.invoke(xDoc
, "print", (tuple(props
),)) # damn, that's a keyword!
312 prt
= xDoc
.getPrinter()
314 if value
.Name
== "IsBusy":
316 log("...done printing")
318 class LoadPrintFileTest
:
319 def __init__(self
, file, prtsuffix
):
321 self
.prtsuffix
= prtsuffix
322 def run(self
, xContext
):
323 start
= datetime
.datetime
.now()
324 log("Time: " + str(start
) + " Loading document: " + self
.file)
327 if os
.name
== 'nt' and self
.file[1] == ':':
328 url
= "file:///" + self
.file[0:2] + quote(self
.file[2:])
330 url
= "file://" + quote(self
.file)
331 xDoc
= loadFromURL(xContext
, url
)
332 log("loadFromURL in: " + str(datetime
.datetime
.now() - start
))
333 printDoc(xContext
, xDoc
, url
+ self
.prtsuffix
)
337 end
= datetime
.datetime
.now()
338 log("...done with: " + self
.file + " in: " + str(end
- start
))
340 def runLoadPrintFileTests(opts
, dirs
, suffix
, reference
):
342 prtsuffix
= ".pdf.reference"
345 files
= getFiles(dirs
, suffix
)
346 tests
= (LoadPrintFileTest(file, prtsuffix
) for file in files
)
347 # connection = PersistentConnection(opts)
348 connection
= PerTestConnection(opts
)
349 failed
= runConnectionTests(connection
, simpleInvoke
, tests
)
350 print("all printed: FAILURES: " + str(len(failed
)))
355 def mkImages(file, resolution
):
356 argv
= [ "gs", "-r" + resolution
, "-sOutputFile=" + file + ".%04d.jpeg",
357 "-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
358 subprocess
.check_call(argv
)
360 def mkAllImages(dirs
, suffix
, resolution
, reference
, failed
):
362 prtsuffix
= ".pdf.reference"
366 files
= filelist(dir, suffix
)
370 log("Skipping failed: " + f
)
372 mkImages(f
+ prtsuffix
, resolution
)
374 def identify(imagefile
):
375 argv
= ["identify", "-format", "%k", imagefile
]
376 process
= subprocess
.Popen(argv
, stdout
=subprocess
.PIPE
)
377 result
, _
= process
.communicate()
378 if process
.wait() != 0:
379 raise Exception("identify failed")
380 if result
.partition(b
"\n")[0] != b
"1":
381 log("identify result: " + result
.decode('utf-8'))
382 log("DIFFERENCE in " + imagefile
)
384 def compose(refimagefile
, imagefile
, diffimagefile
):
385 argv
= [ "composite", "-compose", "difference",
386 refimagefile
, imagefile
, diffimagefile
]
387 subprocess
.check_call(argv
)
389 def compareImages(file):
390 allimages
= [f
for f
in filelist(os
.path
.dirname(file), ".jpeg")
391 if f
.startswith(file)]
392 # refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
393 # if f.startswith(file + ".reference")]
394 # log("compareImages: allimages:" + str(allimages))
395 (refimages
, images
) = partition(sorted(allimages
),
396 lambda f
: f
.startswith(file + ".pdf.reference"))
397 # log("compareImages: images" + str(images))
398 for (image
, refimage
) in zip(images
, refimages
):
399 compose(image
, refimage
, image
+ ".diff")
400 identify(image
+ ".diff")
401 if (len(images
) != len(refimages
)):
402 log("DIFFERENT NUMBER OF IMAGES FOR: " + file)
404 def compareAllImages(dirs
, suffix
):
405 log("compareAllImages...")
407 files
= filelist(dir, suffix
)
408 # log("compareAllImages:" + str(files))
411 log("...compareAllImages done")
415 (optlist
,args
) = getopt
.getopt(argv
[1:], "hr",
416 ["help", "soffice=", "userdir=", "reference", "valgrind"])
418 return (dict(optlist
), args
)
421 message
= """usage: {program} [option]... [directory]..."
422 -h | --help: print usage information
423 -r | --reference: generate new reference files (otherwise: compare)
424 --soffice=method:location
425 specify soffice instance to connect to
426 supported methods: 'path', 'connect'
427 --userdir=URL specify user installation directory for 'path' method
428 --valgrind pass --valgrind to soffice for 'path' method"""
429 print(message
.format(program
= os
.path
.basename(sys
.argv
[0])))
433 subprocess
.check_output(["gs", "--version"])
435 print("Cannot execute 'gs'. Please install ghostscript.")
438 subprocess
.check_output(["composite", "-version"])
439 subprocess
.check_output(["identify", "-version"])
441 print("Cannot execute 'composite' or 'identify'.")
442 print("Please install ImageMagick.")
445 if __name__
== "__main__":
447 (opts
,args
) = parseArgs(sys
.argv
)
451 if "-h" in opts
or "--help" in opts
:
454 elif "--soffice" in opts
:
455 reference
= "-r" in opts
or "--reference" in opts
456 failed
= runLoadPrintFileTests(opts
, args
, ".odt", reference
)
457 mkAllImages(args
, ".odt", "200", reference
, failed
)
459 compareAllImages(args
, ".odt")
464 # vim: set shiftwidth=4 softtabstop=4 expandtab: