Implement nacl_irt_memory for non-sfi mode.
[chromium-blink-merge.git] / chrome / test / pyautolib / pyauto_utils.py
blob8b16a8ea3dfb2d67bd658abf9db88abdea16c2a1
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Utilities for PyAuto."""
7 import httplib
8 import logging
9 import os
10 import shutil
11 import socket
12 import sys
13 import tempfile
14 import unittest
15 import urlparse
16 import zipfile
19 class ExistingPathReplacer(object):
20 """Facilitates backing up a given path (file or dir)..
22 Often you want to manipulate a directory or file for testing but don't want to
23 meddle with the existing contents. This class lets you make a backup, and
24 reinstate the backup when done. A backup is made in an adjacent directory,
25 so you need to make sure you have write permissions to the parent directory.
27 Works seemlessly in cases where the requested path already exists, or not.
29 Automatically reinstates the backed up path (if any) when object is deleted.
30 """
31 _path = ''
32 _backup_dir = None # dir to which existing content is backed up
33 _backup_basename = ''
35 def __init__(self, path, path_type='dir'):
36 """Initialize the object, making backups if necessary.
38 Args:
39 path: the requested path to file or directory
40 path_type: path type. Options: 'file', 'dir'. Default: 'dir'
41 """
42 assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type
43 self._path_type = path_type
44 self._path = path
45 if os.path.exists(self._path):
46 if 'dir' == self._path_type:
47 assert os.path.isdir(self._path), '%s is not a directory' % self._path
48 else:
49 assert os.path.isfile(self._path), '%s is not a file' % self._path
50 # take a backup
51 self._backup_basename = os.path.basename(self._path)
52 self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path),
53 prefix='bkp-' + self._backup_basename)
54 logging.info('Backing up %s in %s' % (self._path, self._backup_dir))
55 shutil.move(self._path,
56 os.path.join(self._backup_dir, self._backup_basename))
57 self._CreateRequestedPath()
59 def __del__(self):
60 """Cleanup. Reinstate backup."""
61 self._CleanupRequestedPath()
62 if self._backup_dir: # Reinstate, if backed up.
63 from_path = os.path.join(self._backup_dir, self._backup_basename)
64 logging.info('Reinstating backup from %s to %s' % (from_path, self._path))
65 shutil.move(from_path, self._path)
66 self._RemoveBackupDir()
68 def _CreateRequestedPath(self):
69 # Create intermediate dirs if needed.
70 if not os.path.exists(os.path.dirname(self._path)):
71 os.makedirs(os.path.dirname(self._path))
72 if 'dir' == self._path_type:
73 os.mkdir(self._path)
74 else:
75 open(self._path, 'w').close()
77 def _CleanupRequestedPath(self):
78 if os.path.exists(self._path):
79 if os.path.isdir(self._path):
80 shutil.rmtree(self._path, ignore_errors=True)
81 else:
82 os.remove(self._path)
84 def _RemoveBackupDir(self):
85 if self._backup_dir and os.path.isdir(self._backup_dir):
86 shutil.rmtree(self._backup_dir, ignore_errors=True)
89 def RemovePath(path):
90 """Remove the given path (file or dir)."""
91 if os.path.isdir(path):
92 shutil.rmtree(path, ignore_errors=True)
93 return
94 try:
95 os.remove(path)
96 except OSError:
97 pass
100 def UnzipFilenameToDir(filename, dir):
101 """Unzip |filename| to directory |dir|.
103 This works with as low as python2.4 (used on win).
105 zf = zipfile.ZipFile(filename)
106 pushd = os.getcwd()
107 if not os.path.isdir(dir):
108 os.mkdir(dir)
109 os.chdir(dir)
110 # Extract files.
111 for info in zf.infolist():
112 name = info.filename
113 if name.endswith('/'): # dir
114 if not os.path.isdir(name):
115 os.makedirs(name)
116 else: # file
117 dir = os.path.dirname(name)
118 if not os.path.isdir(dir):
119 os.makedirs(dir)
120 out = open(name, 'wb')
121 out.write(zf.read(name))
122 out.close()
123 # Set permissions. Permission info in external_attr is shifted 16 bits.
124 os.chmod(name, info.external_attr >> 16L)
125 os.chdir(pushd)
128 def GetCurrentPlatform():
129 """Get a string representation for the current platform.
131 Returns:
132 'mac', 'win' or 'linux'
134 if sys.platform == 'darwin':
135 return 'mac'
136 if sys.platform == 'win32':
137 return 'win'
138 if sys.platform.startswith('linux'):
139 return 'linux'
140 raise RuntimeError('Unknown platform')
143 def PrintPerfResult(graph_name, series_name, data_point, units,
144 show_on_waterfall=False):
145 """Prints a line to stdout that is specially formatted for the perf bots.
147 Args:
148 graph_name: String name for the graph on which to plot the data.
149 series_name: String name for the series (line on the graph) associated with
150 the data. This is also the string displayed on the waterfall
151 if |show_on_waterfall| is True.
152 data_point: Numeric data value to plot on the graph for the current build.
153 This can be a single value or an array of values. If an array,
154 the graph will plot the average of the values, along with error
155 bars.
156 units: The string unit of measurement for the given |data_point|.
157 show_on_waterfall: Whether or not to display this result directly on the
158 buildbot waterfall itself (in the buildbot step running
159 this test on the waterfall page, not the stdio page).
161 waterfall_indicator = ['', '*'][show_on_waterfall]
162 print '%sRESULT %s: %s= %s %s' % (
163 waterfall_indicator, graph_name, series_name,
164 str(data_point).replace(' ', ''), units)
165 sys.stdout.flush()
168 def Shard(ilist, shard_index, num_shards):
169 """Shard a given list and return the group at index |shard_index|.
171 Args:
172 ilist: input list
173 shard_index: 0-based sharding index
174 num_shards: shard count
176 chunk_size = len(ilist) / num_shards
177 chunk_start = shard_index * chunk_size
178 if shard_index == num_shards - 1: # Exhaust the remainder in the last shard.
179 chunk_end = len(ilist)
180 else:
181 chunk_end = chunk_start + chunk_size
182 return ilist[chunk_start:chunk_end]
185 def WaitForDomElement(pyauto, driver, xpath):
186 """Wait for the UI element to appear.
188 Args:
189 pyauto: an instance of pyauto.PyUITest.
190 driver: an instance of chrome driver or a web element.
191 xpath: the xpath of the element to wait for.
193 Returns:
194 The element if it is found.
195 NoSuchElementException if it is not found.
197 pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0)
198 return driver.find_element_by_xpath(xpath)
201 def DoesUrlExist(url):
202 """Determines whether a resource exists at the given URL.
204 Args:
205 url: URL to be verified.
207 Returns:
208 True if url exists, otherwise False.
210 parsed = urlparse.urlparse(url)
211 try:
212 conn = httplib.HTTPConnection(parsed.netloc)
213 conn.request('HEAD', parsed.path)
214 response = conn.getresponse()
215 except (socket.gaierror, socket.error):
216 return False
217 finally:
218 conn.close()
219 # Follow both permanent (301) and temporary (302) redirects.
220 if response.status == 302 or response.status == 301:
221 return DoesUrlExist(response.getheader('location'))
222 return response.status == 200
225 class _GTestTextTestResult(unittest._TextTestResult):
226 """A test result class that can print formatted text results to a stream.
228 Results printed in conformance with gtest output format, like:
229 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc."
230 [ OK ] autofill.AutofillTest.testAutofillInvalid
231 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc."
232 [ OK ] autofill.AutofillTest.testFillProfile
233 [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test."
234 [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters
237 def __init__(self, stream, descriptions, verbosity):
238 unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
240 def _GetTestURI(self, test):
241 if sys.version_info[:2] <= (2, 4):
242 return '%s.%s' % (unittest._strclass(test.__class__),
243 test._TestCase__testMethodName)
244 return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName)
246 def getDescription(self, test):
247 return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription())
249 def startTest(self, test):
250 unittest.TestResult.startTest(self, test)
251 self.stream.writeln('[ RUN ] %s' % self.getDescription(test))
253 def addSuccess(self, test):
254 unittest.TestResult.addSuccess(self, test)
255 self.stream.writeln('[ OK ] %s' % self._GetTestURI(test))
257 def addError(self, test, err):
258 unittest.TestResult.addError(self, test, err)
259 self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test))
261 def addFailure(self, test, err):
262 unittest.TestResult.addFailure(self, test, err)
263 self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test))
266 class GTestTextTestRunner(unittest.TextTestRunner):
267 """Test Runner for displaying test results in textual format.
269 Results are displayed in conformance with gtest output.
272 def __init__(self, verbosity=1):
273 unittest.TextTestRunner.__init__(self, stream=sys.stderr,
274 verbosity=verbosity)
276 def _makeResult(self):
277 return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity)