1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Utilities for PyAuto."""
19 class ExistingPathReplacer(object):
20 """Facilitates backing up a given path (file or dir)..
22 Often you want to manipulate a directory or file for testing but don't want to
23 meddle with the existing contents. This class lets you make a backup, and
24 reinstate the backup when done. A backup is made in an adjacent directory,
25 so you need to make sure you have write permissions to the parent directory.
27 Works seemlessly in cases where the requested path already exists, or not.
29 Automatically reinstates the backed up path (if any) when object is deleted.
32 _backup_dir
= None # dir to which existing content is backed up
35 def __init__(self
, path
, path_type
='dir'):
36 """Initialize the object, making backups if necessary.
39 path: the requested path to file or directory
40 path_type: path type. Options: 'file', 'dir'. Default: 'dir'
42 assert path_type
in ('file', 'dir'), 'Invalid path_type: %s' % path_type
43 self
._path
_type
= path_type
45 if os
.path
.exists(self
._path
):
46 if 'dir' == self
._path
_type
:
47 assert os
.path
.isdir(self
._path
), '%s is not a directory' % self
._path
49 assert os
.path
.isfile(self
._path
), '%s is not a file' % self
._path
51 self
._backup
_basename
= os
.path
.basename(self
._path
)
52 self
._backup
_dir
= tempfile
.mkdtemp(dir=os
.path
.dirname(self
._path
),
53 prefix
='bkp-' + self
._backup
_basename
)
54 logging
.info('Backing up %s in %s' % (self
._path
, self
._backup
_dir
))
55 shutil
.move(self
._path
,
56 os
.path
.join(self
._backup
_dir
, self
._backup
_basename
))
57 self
._CreateRequestedPath
()
60 """Cleanup. Reinstate backup."""
61 self
._CleanupRequestedPath
()
62 if self
._backup
_dir
: # Reinstate, if backed up.
63 from_path
= os
.path
.join(self
._backup
_dir
, self
._backup
_basename
)
64 logging
.info('Reinstating backup from %s to %s' % (from_path
, self
._path
))
65 shutil
.move(from_path
, self
._path
)
66 self
._RemoveBackupDir
()
68 def _CreateRequestedPath(self
):
69 # Create intermediate dirs if needed.
70 if not os
.path
.exists(os
.path
.dirname(self
._path
)):
71 os
.makedirs(os
.path
.dirname(self
._path
))
72 if 'dir' == self
._path
_type
:
75 open(self
._path
, 'w').close()
77 def _CleanupRequestedPath(self
):
78 if os
.path
.exists(self
._path
):
79 if os
.path
.isdir(self
._path
):
80 shutil
.rmtree(self
._path
, ignore_errors
=True)
84 def _RemoveBackupDir(self
):
85 if self
._backup
_dir
and os
.path
.isdir(self
._backup
_dir
):
86 shutil
.rmtree(self
._backup
_dir
, ignore_errors
=True)
90 """Remove the given path (file or dir)."""
91 if os
.path
.isdir(path
):
92 shutil
.rmtree(path
, ignore_errors
=True)
100 def UnzipFilenameToDir(filename
, dir):
101 """Unzip |filename| to directory |dir|.
103 This works with as low as python2.4 (used on win).
105 zf
= zipfile
.ZipFile(filename
)
107 if not os
.path
.isdir(dir):
111 for info
in zf
.infolist():
113 if name
.endswith('/'): # dir
114 if not os
.path
.isdir(name
):
117 dir = os
.path
.dirname(name
)
118 if not os
.path
.isdir(dir):
120 out
= open(name
, 'wb')
121 out
.write(zf
.read(name
))
123 # Set permissions. Permission info in external_attr is shifted 16 bits.
124 os
.chmod(name
, info
.external_attr
>> 16L)
128 def GetCurrentPlatform():
129 """Get a string representation for the current platform.
132 'mac', 'win' or 'linux'
134 if sys
.platform
== 'darwin':
136 if sys
.platform
== 'win32':
138 if sys
.platform
.startswith('linux'):
140 raise RuntimeError('Unknown platform')
143 def PrintPerfResult(graph_name
, series_name
, data_point
, units
,
144 show_on_waterfall
=False):
145 """Prints a line to stdout that is specially formatted for the perf bots.
148 graph_name: String name for the graph on which to plot the data.
149 series_name: String name for the series (line on the graph) associated with
150 the data. This is also the string displayed on the waterfall
151 if |show_on_waterfall| is True.
152 data_point: Numeric data value to plot on the graph for the current build.
153 This can be a single value or an array of values. If an array,
154 the graph will plot the average of the values, along with error
156 units: The string unit of measurement for the given |data_point|.
157 show_on_waterfall: Whether or not to display this result directly on the
158 buildbot waterfall itself (in the buildbot step running
159 this test on the waterfall page, not the stdio page).
161 waterfall_indicator
= ['', '*'][show_on_waterfall
]
162 print '%sRESULT %s: %s= %s %s' % (
163 waterfall_indicator
, graph_name
, series_name
,
164 str(data_point
).replace(' ', ''), units
)
168 def Shard(ilist
, shard_index
, num_shards
):
169 """Shard a given list and return the group at index |shard_index|.
173 shard_index: 0-based sharding index
174 num_shards: shard count
176 chunk_size
= len(ilist
) / num_shards
177 chunk_start
= shard_index
* chunk_size
178 if shard_index
== num_shards
- 1: # Exhaust the remainder in the last shard.
179 chunk_end
= len(ilist
)
181 chunk_end
= chunk_start
+ chunk_size
182 return ilist
[chunk_start
:chunk_end
]
185 def WaitForDomElement(pyauto
, driver
, xpath
):
186 """Wait for the UI element to appear.
189 pyauto: an instance of pyauto.PyUITest.
190 driver: an instance of chrome driver or a web element.
191 xpath: the xpath of the element to wait for.
194 The element if it is found.
195 NoSuchElementException if it is not found.
197 pyauto
.WaitUntil(lambda: len(driver
.find_elements_by_xpath(xpath
)) > 0)
198 return driver
.find_element_by_xpath(xpath
)
201 def DoesUrlExist(url
):
202 """Determines whether a resource exists at the given URL.
205 url: URL to be verified.
208 True if url exists, otherwise False.
210 parsed
= urlparse
.urlparse(url
)
212 conn
= httplib
.HTTPConnection(parsed
.netloc
)
213 conn
.request('HEAD', parsed
.path
)
214 response
= conn
.getresponse()
215 except (socket
.gaierror
, socket
.error
):
219 # Follow both permanent (301) and temporary (302) redirects.
220 if response
.status
== 302 or response
.status
== 301:
221 return DoesUrlExist(response
.getheader('location'))
222 return response
.status
== 200
225 class _GTestTextTestResult(unittest
._TextTestResult
):
226 """A test result class that can print formatted text results to a stream.
228 Results printed in conformance with gtest output format, like:
229 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc."
230 [ OK ] autofill.AutofillTest.testAutofillInvalid
231 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc."
232 [ OK ] autofill.AutofillTest.testFillProfile
233 [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test."
234 [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters
237 def __init__(self
, stream
, descriptions
, verbosity
):
238 unittest
._TextTestResult
.__init
__(self
, stream
, descriptions
, verbosity
)
240 def _GetTestURI(self
, test
):
241 if sys
.version_info
[:2] <= (2, 4):
242 return '%s.%s' % (unittest
._strclass
(test
.__class
__),
243 test
._TestCase
__testMethodName
)
244 return '%s.%s' % (unittest
._strclass
(test
.__class
__), test
._testMethodName
)
246 def getDescription(self
, test
):
247 return '%s: "%s"' % (self
._GetTestURI
(test
), test
.shortDescription())
249 def startTest(self
, test
):
250 unittest
.TestResult
.startTest(self
, test
)
251 self
.stream
.writeln('[ RUN ] %s' % self
.getDescription(test
))
253 def addSuccess(self
, test
):
254 unittest
.TestResult
.addSuccess(self
, test
)
255 self
.stream
.writeln('[ OK ] %s' % self
._GetTestURI
(test
))
257 def addError(self
, test
, err
):
258 unittest
.TestResult
.addError(self
, test
, err
)
259 self
.stream
.writeln('[ ERROR ] %s' % self
._GetTestURI
(test
))
261 def addFailure(self
, test
, err
):
262 unittest
.TestResult
.addFailure(self
, test
, err
)
263 self
.stream
.writeln('[ FAILED ] %s' % self
._GetTestURI
(test
))
266 class GTestTextTestRunner(unittest
.TextTestRunner
):
267 """Test Runner for displaying test results in textual format.
269 Results are displayed in conformance with gtest output.
272 def __init__(self
, verbosity
=1):
273 unittest
.TextTestRunner
.__init
__(self
, stream
=sys
.stderr
,
276 def _makeResult(self
):
277 return _GTestTextTestResult(self
.stream
, self
.descriptions
, self
.verbosity
)