1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
8 from catapult_base
import cloud_storage
9 from telemetry
.unittest_util
import system_stub
12 def _FakeFindGsutil():
13 return 'fake gsutil path'
18 def _FakeCalulateHashMatchesRead(_
):
21 def _FakeCalulateHashNewHash(_
):
25 class CloudStorageUnitTest(unittest
.TestCase
):
27 def _FakeRunCommand(self
, cmd
):
30 def _FakeGet(self
, bucket
, remote_path
, local_path
):
33 def _assertRunCommandRaisesError(self
, communicate_strs
, error
):
34 stubs
= system_stub
.Override(cloud_storage
, ['open', 'subprocess'])
35 orig_find_gs_util
= cloud_storage
.FindGsutil
36 cloud_storage
.FindGsutil
= _FakeFindGsutil
37 stubs
.open.files
= {'fake gsutil path':''}
38 stubs
.subprocess
.Popen
.returncode_result
= 1
40 for string
in communicate_strs
:
41 stubs
.subprocess
.Popen
.communicate_result
= ('', string
)
42 self
.assertRaises(error
, cloud_storage
._RunCommand
, [])
45 cloud_storage
.FindGsutil
= orig_find_gs_util
47 def testRunCommandCredentialsError(self
):
48 strs
= ['You are attempting to access protected data with no configured',
49 'Failure: No handler was ready to authenticate.']
50 self
._assertRunCommandRaisesError
(strs
, cloud_storage
.CredentialsError
)
52 def testRunCommandPermissionError(self
):
53 strs
= ['status=403', 'status 403', '403 Forbidden']
54 self
._assertRunCommandRaisesError
(strs
, cloud_storage
.PermissionError
)
56 def testRunCommandNotFoundError(self
):
57 strs
= ['InvalidUriError', 'No such object', 'No URLs matched',
58 'One or more URLs matched no', 'InvalidUriError']
59 self
._assertRunCommandRaisesError
(strs
, cloud_storage
.NotFoundError
)
61 def testRunCommandServerError(self
):
62 strs
= ['500 Internal Server Error']
63 self
._assertRunCommandRaisesError
(strs
, cloud_storage
.ServerError
)
65 def testRunCommandGenericError(self
):
66 strs
= ['Random string']
67 self
._assertRunCommandRaisesError
(strs
, cloud_storage
.CloudStorageError
)
69 def testInsertCreatesValidCloudUrl(self
):
70 orig_run_command
= cloud_storage
._RunCommand
72 cloud_storage
._RunCommand
= self
._FakeRunCommand
73 remote_path
= 'test-remote-path.html'
74 local_path
= 'test-local-path.html'
75 cloud_url
= cloud_storage
.Insert(cloud_storage
.PUBLIC_BUCKET
,
76 remote_path
, local_path
)
77 self
.assertEqual('https://console.developers.google.com/m/cloudstorage'
78 '/b/chromium-telemetry/o/test-remote-path.html',
81 cloud_storage
._RunCommand
= orig_run_command
83 def testExistsReturnsFalse(self
):
84 stubs
= system_stub
.Override(cloud_storage
, ['subprocess'])
85 orig_find_gs_util
= cloud_storage
.FindGsutil
87 stubs
.subprocess
.Popen
.communicate_result
= (
89 'CommandException: One or more URLs matched no objects.\n')
90 stubs
.subprocess
.Popen
.returncode_result
= 1
91 cloud_storage
.FindGsutil
= _FakeFindGsutil
92 self
.assertFalse(cloud_storage
.Exists('fake bucket',
96 cloud_storage
.FindGsutil
= orig_find_gs_util
98 def testGetIfChanged(self
):
99 stubs
= system_stub
.Override(cloud_storage
, ['os', 'open'])
100 stubs
.open.files
[_FakeFindGsutil()] = ''
101 orig_get
= cloud_storage
.Get
102 orig_read_hash
= cloud_storage
.ReadHash
103 orig_calculate_hash
= cloud_storage
.CalculateHash
104 cloud_storage
.ReadHash
= _FakeReadHash
105 cloud_storage
.CalculateHash
= _FakeCalulateHashMatchesRead
106 file_path
= 'test-file-path.wpr'
107 hash_path
= file_path
+ '.sha1'
109 cloud_storage
.Get
= self
._FakeGet
110 # hash_path doesn't exist.
111 self
.assertFalse(cloud_storage
.GetIfChanged(file_path
,
112 cloud_storage
.PUBLIC_BUCKET
))
113 # hash_path exists, but file_path doesn't.
114 stubs
.os
.path
.files
.append(hash_path
)
115 self
.assertTrue(cloud_storage
.GetIfChanged(file_path
,
116 cloud_storage
.PUBLIC_BUCKET
))
117 # hash_path and file_path exist, and have same hash.
118 stubs
.os
.path
.files
.append(file_path
)
119 self
.assertFalse(cloud_storage
.GetIfChanged(file_path
,
120 cloud_storage
.PUBLIC_BUCKET
))
121 # hash_path and file_path exist, and have different hashes.
122 cloud_storage
.CalculateHash
= _FakeCalulateHashNewHash
123 self
.assertTrue(cloud_storage
.GetIfChanged(file_path
,
124 cloud_storage
.PUBLIC_BUCKET
))
127 cloud_storage
.Get
= orig_get
128 cloud_storage
.CalculateHash
= orig_calculate_hash
129 cloud_storage
.ReadHash
= orig_read_hash
131 def testGetFilesInDirectoryIfChanged(self
):
132 stubs
= system_stub
.Override(cloud_storage
, ['os'])
133 stubs
.os
._directory
= {'dir1':['1file1.sha1', '1file2.txt', '1file3.sha1'],
134 'dir2':['2file.txt'], 'dir3':['3file1.sha1']}
135 stubs
.os
.path
.dirs
= ['real_dir_path']
136 def IncrementFilesUpdated(*_
):
137 IncrementFilesUpdated
.files_updated
+= 1
138 IncrementFilesUpdated
.files_updated
= 0
139 orig_get_if_changed
= cloud_storage
.GetIfChanged
140 cloud_storage
.GetIfChanged
= IncrementFilesUpdated
142 self
.assertRaises(ValueError, cloud_storage
.GetFilesInDirectoryIfChanged
,
143 os
.path
.abspath(os
.sep
), cloud_storage
.PUBLIC_BUCKET
)
144 self
.assertEqual(0, IncrementFilesUpdated
.files_updated
)
145 self
.assertRaises(ValueError, cloud_storage
.GetFilesInDirectoryIfChanged
,
146 'fake_dir_path', cloud_storage
.PUBLIC_BUCKET
)
147 self
.assertEqual(0, IncrementFilesUpdated
.files_updated
)
148 cloud_storage
.GetFilesInDirectoryIfChanged('real_dir_path',
149 cloud_storage
.PUBLIC_BUCKET
)
150 self
.assertEqual(3, IncrementFilesUpdated
.files_updated
)
152 cloud_storage
.GetIfChanged
= orig_get_if_changed
156 orig_run_command
= cloud_storage
._RunCommand
157 def AssertCorrectRunCommandArgs(args
):
158 self
.assertEqual(expected_args
, args
)
159 cloud_storage
._RunCommand
= AssertCorrectRunCommandArgs
160 expected_args
= ['cp', 'gs://bucket1/remote_path1',
161 'gs://bucket2/remote_path2']
163 cloud_storage
.Copy('bucket1', 'bucket2', 'remote_path1', 'remote_path2')
165 cloud_storage
._RunCommand
= orig_run_command