Update WebCore.FeatureObserver histogram owner.
[chromium-blink-merge.git] / tools / telemetry / catapult_base / cloud_storage_unittest.py
blobfb7452248ba17501d95750dca4835ec2037ea87d
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 import os
6 import unittest
8 from catapult_base import cloud_storage
9 from telemetry.unittest_util import system_stub
12 def _FakeFindGsutil():
13 return 'fake gsutil path'
15 def _FakeReadHash(_):
16 return 'hashthis!'
18 def _FakeCalulateHashMatchesRead(_):
19 return 'hashthis!'
21 def _FakeCalulateHashNewHash(_):
22 return 'omgnewhash'
25 class CloudStorageUnitTest(unittest.TestCase):
27 def _FakeRunCommand(self, cmd):
28 pass
30 def _FakeGet(self, bucket, remote_path, local_path):
31 pass
33 def _assertRunCommandRaisesError(self, communicate_strs, error):
34 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess'])
35 orig_find_gs_util = cloud_storage.FindGsutil
36 cloud_storage.FindGsutil = _FakeFindGsutil
37 stubs.open.files = {'fake gsutil path':''}
38 stubs.subprocess.Popen.returncode_result = 1
39 try:
40 for string in communicate_strs:
41 stubs.subprocess.Popen.communicate_result = ('', string)
42 self.assertRaises(error, cloud_storage._RunCommand, [])
43 finally:
44 stubs.Restore()
45 cloud_storage.FindGsutil = orig_find_gs_util
47 def testRunCommandCredentialsError(self):
48 strs = ['You are attempting to access protected data with no configured',
49 'Failure: No handler was ready to authenticate.']
50 self._assertRunCommandRaisesError(strs, cloud_storage.CredentialsError)
52 def testRunCommandPermissionError(self):
53 strs = ['status=403', 'status 403', '403 Forbidden']
54 self._assertRunCommandRaisesError(strs, cloud_storage.PermissionError)
56 def testRunCommandNotFoundError(self):
57 strs = ['InvalidUriError', 'No such object', 'No URLs matched',
58 'One or more URLs matched no', 'InvalidUriError']
59 self._assertRunCommandRaisesError(strs, cloud_storage.NotFoundError)
61 def testRunCommandServerError(self):
62 strs = ['500 Internal Server Error']
63 self._assertRunCommandRaisesError(strs, cloud_storage.ServerError)
65 def testRunCommandGenericError(self):
66 strs = ['Random string']
67 self._assertRunCommandRaisesError(strs, cloud_storage.CloudStorageError)
69 def testInsertCreatesValidCloudUrl(self):
70 orig_run_command = cloud_storage._RunCommand
71 try:
72 cloud_storage._RunCommand = self._FakeRunCommand
73 remote_path = 'test-remote-path.html'
74 local_path = 'test-local-path.html'
75 cloud_url = cloud_storage.Insert(cloud_storage.PUBLIC_BUCKET,
76 remote_path, local_path)
77 self.assertEqual('https://console.developers.google.com/m/cloudstorage'
78 '/b/chromium-telemetry/o/test-remote-path.html',
79 cloud_url)
80 finally:
81 cloud_storage._RunCommand = orig_run_command
83 def testExistsReturnsFalse(self):
84 stubs = system_stub.Override(cloud_storage, ['subprocess'])
85 orig_find_gs_util = cloud_storage.FindGsutil
86 try:
87 stubs.subprocess.Popen.communicate_result = (
88 '',
89 'CommandException: One or more URLs matched no objects.\n')
90 stubs.subprocess.Popen.returncode_result = 1
91 cloud_storage.FindGsutil = _FakeFindGsutil
92 self.assertFalse(cloud_storage.Exists('fake bucket',
93 'fake remote path'))
94 finally:
95 stubs.Restore()
96 cloud_storage.FindGsutil = orig_find_gs_util
98 def testGetIfChanged(self):
99 stubs = system_stub.Override(cloud_storage, ['os', 'open'])
100 stubs.open.files[_FakeFindGsutil()] = ''
101 orig_get = cloud_storage.Get
102 orig_read_hash = cloud_storage.ReadHash
103 orig_calculate_hash = cloud_storage.CalculateHash
104 cloud_storage.ReadHash = _FakeReadHash
105 cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead
106 file_path = 'test-file-path.wpr'
107 hash_path = file_path + '.sha1'
108 try:
109 cloud_storage.Get = self._FakeGet
110 # hash_path doesn't exist.
111 self.assertFalse(cloud_storage.GetIfChanged(file_path,
112 cloud_storage.PUBLIC_BUCKET))
113 # hash_path exists, but file_path doesn't.
114 stubs.os.path.files.append(hash_path)
115 self.assertTrue(cloud_storage.GetIfChanged(file_path,
116 cloud_storage.PUBLIC_BUCKET))
117 # hash_path and file_path exist, and have same hash.
118 stubs.os.path.files.append(file_path)
119 self.assertFalse(cloud_storage.GetIfChanged(file_path,
120 cloud_storage.PUBLIC_BUCKET))
121 # hash_path and file_path exist, and have different hashes.
122 cloud_storage.CalculateHash = _FakeCalulateHashNewHash
123 self.assertTrue(cloud_storage.GetIfChanged(file_path,
124 cloud_storage.PUBLIC_BUCKET))
125 finally:
126 stubs.Restore()
127 cloud_storage.Get = orig_get
128 cloud_storage.CalculateHash = orig_calculate_hash
129 cloud_storage.ReadHash = orig_read_hash
131 def testGetFilesInDirectoryIfChanged(self):
132 stubs = system_stub.Override(cloud_storage, ['os'])
133 stubs.os._directory = {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'],
134 'dir2':['2file.txt'], 'dir3':['3file1.sha1']}
135 stubs.os.path.dirs = ['real_dir_path']
136 def IncrementFilesUpdated(*_):
137 IncrementFilesUpdated.files_updated += 1
138 IncrementFilesUpdated.files_updated = 0
139 orig_get_if_changed = cloud_storage.GetIfChanged
140 cloud_storage.GetIfChanged = IncrementFilesUpdated
141 try:
142 self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged,
143 os.path.abspath(os.sep), cloud_storage.PUBLIC_BUCKET)
144 self.assertEqual(0, IncrementFilesUpdated.files_updated)
145 self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged,
146 'fake_dir_path', cloud_storage.PUBLIC_BUCKET)
147 self.assertEqual(0, IncrementFilesUpdated.files_updated)
148 cloud_storage.GetFilesInDirectoryIfChanged('real_dir_path',
149 cloud_storage.PUBLIC_BUCKET)
150 self.assertEqual(3, IncrementFilesUpdated.files_updated)
151 finally:
152 cloud_storage.GetIfChanged = orig_get_if_changed
153 stubs.Restore()
155 def testCopy(self):
156 orig_run_command = cloud_storage._RunCommand
157 def AssertCorrectRunCommandArgs(args):
158 self.assertEqual(expected_args, args)
159 cloud_storage._RunCommand = AssertCorrectRunCommandArgs
160 expected_args = ['cp', 'gs://bucket1/remote_path1',
161 'gs://bucket2/remote_path2']
162 try:
163 cloud_storage.Copy('bucket1', 'bucket2', 'remote_path1', 'remote_path2')
164 finally:
165 cloud_storage._RunCommand = orig_run_command