1 # -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
3 # This file is part of the LibreOffice project.
5 # This Source Code Form is subject to the terms of the Mozilla Public
6 # License, v. 2.0. If a copy of the MPL was not distributed with this
7 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
20 from urllib
.parse
import quote
22 from urllib
import quote
29 print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
30 print("PYTHONPATH=/installation/opt/program")
31 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
35 from com
.sun
.star
.document
import XDocumentEventListener
37 print("UNO API class not found: try to set URE_BOOTSTRAP variable")
38 print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
44 print(*args
, flush
=True)
46 def partition(list, pred
):
56 def filelist(dir, suffix
):
58 raise Exception("filelist: empty directory")
59 if not(dir[-1] == "/"):
61 files
= [dir + f
for f
in os
.listdir(dir)]
63 return [f
for f
in files
64 if os
.path
.isfile(f
) and os
.path
.splitext(f
)[1] == suffix
]
66 def getFiles(dirs
, suffix
):
69 files
+= filelist(dir, suffix
)
74 class OfficeConnection
:
75 def __init__(self
, args
):
81 (method
, sep
, rest
) = self
.args
["--soffice"].partition(":")
83 raise Exception("soffice parameter does not specify method")
85 self
.socket
= "pipe,name=pytest" + str(uuid
.uuid1())
87 userdir
= self
.args
["--userdir"]
89 raise Exception("'path' method requires --userdir")
90 if not(userdir
.startswith("file://")):
91 raise Exception("--userdir must be file URL")
92 self
.soffice
= self
.bootstrap(rest
, userdir
, self
.socket
)
93 elif method
== "connect":
96 raise Exception("unsupported connection method: " + method
)
97 self
.xContext
= self
.connect(self
.socket
)
99 def bootstrap(self
, soffice
, userdir
, socket
):
100 argv
= [ soffice
, "--accept=" + socket
+ ";urp",
101 "-env:UserInstallation=" + userdir
,
103 "--norestore", "--nologo", "--headless" ]
104 if "--valgrind" in self
.args
:
105 argv
.append("--valgrind")
106 return subprocess
.Popen(argv
)
108 def connect(self
, socket
):
109 xLocalContext
= uno
.getComponentContext()
110 xUnoResolver
= xLocalContext
.ServiceManager
.createInstanceWithContext(
111 "com.sun.star.bridge.UnoUrlResolver", xLocalContext
)
112 url
= "uno:" + socket
+ ";urp;StarOffice.ComponentContext"
113 log("OfficeConnection: connecting to: " + url
)
116 xContext
= xUnoResolver
.resolve(url
)
118 # except com.sun.star.connection.NoConnectException
119 except pyuno
.getClass("com.sun.star.connection.NoConnectException"):
120 log("NoConnectException: sleeping...")
127 log("tearDown: calling terminate()...")
128 xMgr
= self
.xContext
.ServiceManager
129 xDesktop
= xMgr
.createInstanceWithContext(
130 "com.sun.star.frame.Desktop", self
.xContext
)
133 # except com.sun.star.lang.DisposedException:
134 except pyuno
.getClass("com.sun.star.beans.UnknownPropertyException"):
135 log("caught UnknownPropertyException")
136 pass # ignore, also means disposed
137 except pyuno
.getClass("com.sun.star.lang.DisposedException"):
138 log("caught DisposedException")
141 self
.soffice
.terminate()
142 ret
= self
.soffice
.wait()
147 raise Exception("Exit status indicates failure: " + str(ret
))
150 class WatchDog(threading
.Thread
):
151 def __init__(self
, connection
):
152 threading
.Thread
.__init
__(self
, name
="WatchDog " + connection
.socket
)
153 self
.connection
= connection
156 if self
.connection
.soffice
: # not possible for "connect"
157 self
.connection
.soffice
.wait(timeout
=120) # 2 minutes?
158 except subprocess
.TimeoutExpired
:
159 log("WatchDog: TIMEOUT -> killing soffice")
160 self
.connection
.soffice
.terminate() # actually killing oosplash...
161 self
.connection
.xContext
= None
162 log("WatchDog: killed soffice")
164 class PerTestConnection
:
165 def __init__(self
, args
):
167 self
.connection
= None
169 def getContext(self
):
170 return self
.connection
.xContext
172 assert(not(self
.connection
))
174 conn
= OfficeConnection(self
.args
)
176 self
.connection
= conn
177 self
.watchdog
= WatchDog(self
.connection
)
178 self
.watchdog
.start()
182 self
.connection
.tearDown()
184 self
.connection
= None
187 assert(not(self
.connection
))
189 class PersistentConnection
:
190 def __init__(self
, args
):
192 self
.connection
= None
193 def getContext(self
):
194 return self
.connection
.xContext
196 conn
= OfficeConnection(self
.args
)
198 self
.connection
= conn
200 assert(self
.connection
)
202 assert(self
.connection
)
206 self
.connection
.tearDown()
208 self
.connection
= None
210 def simpleInvoke(connection
, test
):
213 test
.run(connection
.getContext())
215 connection
.postTest()
217 def retryInvoke(connection
, test
):
224 test
.run(connection
.getContext())
227 connection
.postTest()
228 except KeyboardInterrupt:
229 raise # Ctrl+C should work
231 log("retryInvoke: caught exception")
232 raise Exception("FAILED retryInvoke")
234 def runConnectionTests(connection
, invoker
, tests
):
240 invoker(connection
, test
)
241 except KeyboardInterrupt:
242 raise # Ctrl+C should work
244 failed
.append(test
.file)
245 estr
= traceback
.format_exc()
246 log("... FAILED with exception:\n" + estr
)
249 connection
.tearDown()
251 class EventListener(XDocumentEventListener
,unohelper
.Base
):
253 self
.layoutFinished
= False
254 def documentEventOccured(self
, event
):
255 # log(str(event.EventName))
256 if event
.EventName
== "OnLayoutFinished":
257 self
.layoutFinished
= True
258 def disposing(event
):
261 def mkPropertyValue(name
, value
):
262 return uno
.createUnoStruct("com.sun.star.beans.PropertyValue",
267 def loadFromURL(xContext
, url
):
268 xDesktop
= xContext
.ServiceManager
.createInstanceWithContext(
269 "com.sun.star.frame.Desktop", xContext
)
270 props
= [("Hidden", True), ("ReadOnly", True)] # FilterName?
271 loadProps
= tuple([mkPropertyValue(name
, value
) for (name
, value
) in props
])
272 xListener
= EventListener()
273 xGEB
= xContext
.getValueByName(
274 "/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
275 xGEB
.addDocumentEventListener(xListener
)
278 xDoc
= xDesktop
.loadComponentFromURL(url
, "_blank", 0, loadProps
)
280 raise Exception("No document loaded?")
283 if xListener
.layoutFinished
:
288 log("timeout: no OnLayoutFinished received")
297 xGEB
.removeDocumentEventListener(xListener
)
299 def printDoc(xContext
, xDoc
, url
):
300 props
= [ mkPropertyValue("FileName", url
) ]
302 uno
.invoke(xDoc
, "print", (tuple(props
),)) # damn, that's a keyword!
307 prt
= xDoc
.getPrinter()
309 if value
.Name
== "IsBusy":
311 log("...done printing")
313 class LoadPrintFileTest
:
314 def __init__(self
, file, prtsuffix
):
316 self
.prtsuffix
= prtsuffix
317 def run(self
, xContext
):
318 start
= datetime
.datetime
.now()
319 log("Time: " + str(start
) + " Loading document: " + self
.file)
322 url
= "file://" + quote(self
.file)
323 xDoc
= loadFromURL(xContext
, url
)
324 printDoc(xContext
, xDoc
, url
+ self
.prtsuffix
)
328 end
= datetime
.datetime
.now()
329 log("...done with: " + self
.file + " in: " + str(end
- start
))
331 def runLoadPrintFileTests(opts
, dirs
, suffix
, reference
):
333 prtsuffix
= ".pdf.reference"
336 files
= getFiles(dirs
, suffix
)
337 tests
= (LoadPrintFileTest(file, prtsuffix
) for file in files
)
338 # connection = PersistentConnection(opts)
339 connection
= PerTestConnection(opts
)
340 failed
= runConnectionTests(connection
, simpleInvoke
, tests
)
341 print("all printed: FAILURES: " + str(len(failed
)))
346 def mkImages(file, resolution
):
347 argv
= [ "gs", "-r" + resolution
, "-sOutputFile=" + file + ".%04d.jpeg",
348 "-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
349 ret
= subprocess
.check_call(argv
)
351 def mkAllImages(dirs
, suffix
, resolution
, reference
, failed
):
353 prtsuffix
= ".pdf.reference"
357 files
= filelist(dir, suffix
)
361 log("Skipping failed: " + f
)
363 mkImages(f
+ prtsuffix
, resolution
)
365 def identify(imagefile
):
366 argv
= ["identify", "-format", "%k", imagefile
]
367 process
= subprocess
.Popen(argv
, stdout
=subprocess
.PIPE
)
368 result
, _
= process
.communicate()
369 if process
.wait() != 0:
370 raise Exception("identify failed")
371 if result
.partition(b
"\n")[0] != b
"1":
372 log("identify result: " + result
.decode('utf-8'))
373 log("DIFFERENCE in " + imagefile
)
375 def compose(refimagefile
, imagefile
, diffimagefile
):
376 argv
= [ "composite", "-compose", "difference",
377 refimagefile
, imagefile
, diffimagefile
]
378 subprocess
.check_call(argv
)
380 def compareImages(file):
381 allimages
= [f
for f
in filelist(os
.path
.dirname(file), ".jpeg")
382 if f
.startswith(file)]
383 # refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
384 # if f.startswith(file + ".reference")]
385 # log("compareImages: allimages:" + str(allimages))
386 (refimages
, images
) = partition(sorted(allimages
),
387 lambda f
: f
.startswith(file + ".pdf.reference"))
388 # log("compareImages: images" + str(images))
389 for (image
, refimage
) in zip(images
, refimages
):
390 compose(image
, refimage
, image
+ ".diff")
391 identify(image
+ ".diff")
392 if (len(images
) != len(refimages
)):
393 log("DIFFERENT NUMBER OF IMAGES FOR: " + file)
395 def compareAllImages(dirs
, suffix
):
396 log("compareAllImages...")
398 files
= filelist(dir, suffix
)
399 # log("compareAllImages:" + str(files))
402 log("...compareAllImages done")
406 (optlist
,args
) = getopt
.getopt(argv
[1:], "hr",
407 ["help", "soffice=", "userdir=", "reference", "valgrind"])
409 return (dict(optlist
), args
)
412 message
= """usage: {program} [option]... [directory]..."
413 -h | --help: print usage information
414 -r | --reference: generate new reference files (otherwise: compare)
415 --soffice=method:location
416 specify soffice instance to connect to
417 supported methods: 'path', 'connect'
418 --userdir=URL specify user installation directory for 'path' method
419 --valgrind pass --valgrind to soffice for 'path' method"""
420 print(message
.format(program
= os
.path
.basename(sys
.argv
[0])))
424 subprocess
.check_output(["gs", "--version"])
426 print("Cannot execute 'gs'. Please install ghostscript.")
429 subprocess
.check_output(["composite", "-version"])
430 subprocess
.check_output(["identify", "-version"])
432 print("Cannot execute 'composite' or 'identify'.")
433 print("Please install ImageMagick.")
436 if __name__
== "__main__":
438 (opts
,args
) = parseArgs(sys
.argv
)
442 if "-h" in opts
or "--help" in opts
:
445 elif "--soffice" in opts
:
446 reference
= "-r" in opts
or "--reference" in opts
447 failed
= runLoadPrintFileTests(opts
, args
, ".odt", reference
)
448 mkAllImages(args
, ".odt", "200", reference
, failed
)
450 compareAllImages(args
, ".odt")
455 # vim: set shiftwidth=4 softtabstop=4 expandtab: