base/threading: remove ScopedTracker placed for experiments
[chromium-blink-merge.git] / tools / telemetry / catapult_base / cloud_storage_unittest.py
blob8d8a9a9dcf8ce0879160957e10c58455d47f9927
1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 import os
6 import unittest
8 from catapult_base import cloud_storage
9 from telemetry.testing import system_stub
12 def _FakeReadHash(_):
13 return 'hashthis!'
15 def _FakeCalulateHashMatchesRead(_):
16 return 'hashthis!'
18 def _FakeCalulateHashNewHash(_):
19 return 'omgnewhash'
22 class CloudStorageUnitTest(unittest.TestCase):
24 def _FakeRunCommand(self, cmd):
25 pass
27 def _FakeGet(self, bucket, remote_path, local_path):
28 pass
30 def _assertRunCommandRaisesError(self, communicate_strs, error):
31 stubs = system_stub.Override(cloud_storage, ['open', 'subprocess'])
32 stubs.open.files = {'fake gsutil path':''}
33 stubs.subprocess.Popen.returncode_result = 1
34 try:
35 for string in communicate_strs:
36 stubs.subprocess.Popen.communicate_result = ('', string)
37 self.assertRaises(error, cloud_storage._RunCommand, [])
38 finally:
39 stubs.Restore()
41 def testRunCommandCredentialsError(self):
42 strs = ['You are attempting to access protected data with no configured',
43 'Failure: No handler was ready to authenticate.']
44 self._assertRunCommandRaisesError(strs, cloud_storage.CredentialsError)
46 def testRunCommandPermissionError(self):
47 strs = ['status=403', 'status 403', '403 Forbidden']
48 self._assertRunCommandRaisesError(strs, cloud_storage.PermissionError)
50 def testRunCommandNotFoundError(self):
51 strs = ['InvalidUriError', 'No such object', 'No URLs matched',
52 'One or more URLs matched no', 'InvalidUriError']
53 self._assertRunCommandRaisesError(strs, cloud_storage.NotFoundError)
55 def testRunCommandServerError(self):
56 strs = ['500 Internal Server Error']
57 self._assertRunCommandRaisesError(strs, cloud_storage.ServerError)
59 def testRunCommandGenericError(self):
60 strs = ['Random string']
61 self._assertRunCommandRaisesError(strs, cloud_storage.CloudStorageError)
63 def testInsertCreatesValidCloudUrl(self):
64 orig_run_command = cloud_storage._RunCommand
65 try:
66 cloud_storage._RunCommand = self._FakeRunCommand
67 remote_path = 'test-remote-path.html'
68 local_path = 'test-local-path.html'
69 cloud_url = cloud_storage.Insert(cloud_storage.PUBLIC_BUCKET,
70 remote_path, local_path)
71 self.assertEqual('https://console.developers.google.com/m/cloudstorage'
72 '/b/chromium-telemetry/o/test-remote-path.html',
73 cloud_url)
74 finally:
75 cloud_storage._RunCommand = orig_run_command
77 def testExistsReturnsFalse(self):
78 stubs = system_stub.Override(cloud_storage, ['subprocess'])
79 try:
80 stubs.subprocess.Popen.communicate_result = (
81 '',
82 'CommandException: One or more URLs matched no objects.\n')
83 stubs.subprocess.Popen.returncode_result = 1
84 self.assertFalse(cloud_storage.Exists('fake bucket',
85 'fake remote path'))
86 finally:
87 stubs.Restore()
89 def testGetIfChanged(self):
90 stubs = system_stub.Override(cloud_storage, ['os', 'open'])
91 orig_get = cloud_storage.Get
92 orig_read_hash = cloud_storage.ReadHash
93 orig_calculate_hash = cloud_storage.CalculateHash
94 cloud_storage.ReadHash = _FakeReadHash
95 cloud_storage.CalculateHash = _FakeCalulateHashMatchesRead
96 file_path = 'test-file-path.wpr'
97 hash_path = file_path + '.sha1'
98 try:
99 cloud_storage.Get = self._FakeGet
100 # hash_path doesn't exist.
101 self.assertFalse(cloud_storage.GetIfChanged(file_path,
102 cloud_storage.PUBLIC_BUCKET))
103 # hash_path exists, but file_path doesn't.
104 stubs.os.path.files.append(hash_path)
105 self.assertTrue(cloud_storage.GetIfChanged(file_path,
106 cloud_storage.PUBLIC_BUCKET))
107 # hash_path and file_path exist, and have same hash.
108 stubs.os.path.files.append(file_path)
109 self.assertFalse(cloud_storage.GetIfChanged(file_path,
110 cloud_storage.PUBLIC_BUCKET))
111 # hash_path and file_path exist, and have different hashes.
112 cloud_storage.CalculateHash = _FakeCalulateHashNewHash
113 self.assertTrue(cloud_storage.GetIfChanged(file_path,
114 cloud_storage.PUBLIC_BUCKET))
115 finally:
116 stubs.Restore()
117 cloud_storage.Get = orig_get
118 cloud_storage.CalculateHash = orig_calculate_hash
119 cloud_storage.ReadHash = orig_read_hash
121 def testGetFilesInDirectoryIfChanged(self):
122 stubs = system_stub.Override(cloud_storage, ['os'])
123 stubs.os._directory = {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'],
124 'dir2':['2file.txt'], 'dir3':['3file1.sha1']}
125 stubs.os.path.dirs = ['real_dir_path']
126 def IncrementFilesUpdated(*_):
127 IncrementFilesUpdated.files_updated += 1
128 IncrementFilesUpdated.files_updated = 0
129 orig_get_if_changed = cloud_storage.GetIfChanged
130 cloud_storage.GetIfChanged = IncrementFilesUpdated
131 try:
132 self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged,
133 os.path.abspath(os.sep), cloud_storage.PUBLIC_BUCKET)
134 self.assertEqual(0, IncrementFilesUpdated.files_updated)
135 self.assertRaises(ValueError, cloud_storage.GetFilesInDirectoryIfChanged,
136 'fake_dir_path', cloud_storage.PUBLIC_BUCKET)
137 self.assertEqual(0, IncrementFilesUpdated.files_updated)
138 cloud_storage.GetFilesInDirectoryIfChanged('real_dir_path',
139 cloud_storage.PUBLIC_BUCKET)
140 self.assertEqual(3, IncrementFilesUpdated.files_updated)
141 finally:
142 cloud_storage.GetIfChanged = orig_get_if_changed
143 stubs.Restore()
145 def testCopy(self):
146 orig_run_command = cloud_storage._RunCommand
147 def AssertCorrectRunCommandArgs(args):
148 self.assertEqual(expected_args, args)
149 cloud_storage._RunCommand = AssertCorrectRunCommandArgs
150 expected_args = ['cp', 'gs://bucket1/remote_path1',
151 'gs://bucket2/remote_path2']
152 try:
153 cloud_storage.Copy('bucket1', 'bucket2', 'remote_path1', 'remote_path2')
154 finally:
155 cloud_storage._RunCommand = orig_run_command