Only grant permissions to new extensions from sync if they have the expected version
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_du.py
blob2774c32df701f3f6b6c180e4eae10ff8e0a09fe5
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Tests for du command."""
17 from __future__ import absolute_import
19 import gslib.tests.testcase as testcase
20 from gslib.tests.testcase.integration_testcase import SkipForS3
21 from gslib.tests.util import ObjectToURI as suri
22 from gslib.util import Retry
25 class TestDu(testcase.GsUtilIntegrationTestCase):
26 """Integration tests for du command."""
28 def _create_nested_subdir(self):
29 """Creates a nested subdirectory for use by tests in this module."""
30 bucket_uri = self.CreateBucket()
31 obj_uris = []
32 obj_uris.append(self.CreateObject(
33 bucket_uri=bucket_uri, object_name='sub1/five', contents='5five'))
34 obj_uris.append(self.CreateObject(
35 bucket_uri=bucket_uri, object_name='sub1/four', contents='four'))
36 obj_uris.append(self.CreateObject(
37 bucket_uri=bucket_uri, object_name='sub1/sub2/five', contents='5five'))
38 obj_uris.append(self.CreateObject(
39 bucket_uri=bucket_uri, object_name='sub1/sub2/four', contents='four'))
40 self.AssertNObjectsInBucket(bucket_uri, 4)
41 return bucket_uri, obj_uris
43 def test_object(self):
44 obj_uri = self.CreateObject(contents='foo')
45 # Use @Retry as hedge against bucket listing eventual consistency.
46 @Retry(AssertionError, tries=3, timeout_secs=1)
47 def _Check():
48 stdout = self.RunGsUtil(['du', suri(obj_uri)], return_stdout=True)
49 self.assertEqual(stdout, '%-10s %s\n' % (3, suri(obj_uri)))
50 _Check()
52 def test_bucket(self):
53 bucket_uri = self.CreateBucket()
54 obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
55 # Use @Retry as hedge against bucket listing eventual consistency.
56 @Retry(AssertionError, tries=3, timeout_secs=1)
57 def _Check():
58 stdout = self.RunGsUtil(['du', suri(bucket_uri)], return_stdout=True)
59 self.assertEqual(stdout, '%-10s %s\n' % (3, suri(obj_uri)))
60 _Check()
62 def test_subdirs(self):
63 """Tests that subdirectory sizes are correctly calculated and listed."""
64 bucket_uri, obj_uris = self._create_nested_subdir()
66 # Use @Retry as hedge against bucket listing eventual consistency.
67 @Retry(AssertionError, tries=3, timeout_secs=1)
68 def _Check():
69 stdout = self.RunGsUtil(['du', suri(bucket_uri)], return_stdout=True)
70 self.assertSetEqual(set(stdout.splitlines()), set([
71 '%-10s %s' % (5, suri(obj_uris[0])),
72 '%-10s %s' % (4, suri(obj_uris[1])),
73 '%-10s %s' % (5, suri(obj_uris[2])),
74 '%-10s %s' % (4, suri(obj_uris[3])),
75 '%-10s %s/sub1/sub2/' % (9, suri(bucket_uri)),
76 '%-10s %s/sub1/' % (18, suri(bucket_uri)),
77 ]))
78 _Check()
80 def test_multi_args(self):
81 """Tests running du with multiple command line arguments."""
82 bucket_uri = self.CreateBucket()
83 obj_uri1 = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
84 obj_uri2 = self.CreateObject(bucket_uri=bucket_uri, contents='foo2')
85 # Use @Retry as hedge against bucket listing eventual consistency.
86 @Retry(AssertionError, tries=3, timeout_secs=1)
87 def _Check():
88 stdout = self.RunGsUtil(['du', suri(obj_uri1), suri(obj_uri2)],
89 return_stdout=True)
90 self.assertSetEqual(set(stdout.splitlines()), set([
91 '%-10s %s' % (3, suri(obj_uri1)),
92 '%-10s %s' % (4, suri(obj_uri2)),
93 ]))
94 _Check()
96 def test_total(self):
97 """Tests total size listing via the -c flag."""
98 bucket_uri = self.CreateBucket()
99 obj_uri1 = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
100 obj_uri2 = self.CreateObject(bucket_uri=bucket_uri, contents='zebra')
101 # Use @Retry as hedge against bucket listing eventual consistency.
102 @Retry(AssertionError, tries=3, timeout_secs=1)
103 def _Check():
104 stdout = self.RunGsUtil(['du', '-c', suri(bucket_uri)],
105 return_stdout=True)
106 self.assertSetEqual(set(stdout.splitlines()), set([
107 '%-10s %s' % (3, suri(obj_uri1)),
108 '%-10s %s' % (5, suri(obj_uri2)),
109 '%-10s total' % 8,
111 _Check()
113 def test_human_readable(self):
114 obj_uri = self.CreateObject(contents='x' * 2048)
115 # Use @Retry as hedge against bucket listing eventual consistency.
116 @Retry(AssertionError, tries=3, timeout_secs=1)
117 def _Check():
118 stdout = self.RunGsUtil(['du', '-h', suri(obj_uri)], return_stdout=True)
119 self.assertEqual(stdout, '%-10s %s\n' % ('2 KiB', suri(obj_uri)))
120 _Check()
122 def test_summary(self):
123 """Tests summary listing with the -s flag."""
124 bucket_uri1, _ = self._create_nested_subdir()
125 bucket_uri2, _ = self._create_nested_subdir()
127 # Use @Retry as hedge against bucket listing eventual consistency.
128 @Retry(AssertionError, tries=3, timeout_secs=1)
129 def _Check():
130 stdout = self.RunGsUtil([
131 'du', '-s', suri(bucket_uri1), suri(bucket_uri2)], return_stdout=True)
132 self.assertSetEqual(set(stdout.splitlines()), set([
133 '%-10s %s' % (18, suri(bucket_uri1)),
134 '%-10s %s' % (18, suri(bucket_uri2)),
136 _Check()
138 def test_subdir_summary(self):
139 """Tests summary listing with the -s flag on a subdirectory."""
140 bucket_uri1, _ = self._create_nested_subdir()
141 bucket_uri2, _ = self._create_nested_subdir()
142 subdir1 = suri(bucket_uri1, 'sub1')
143 subdir2 = suri(bucket_uri2, 'sub1')
145 # Use @Retry as hedge against bucket listing eventual consistency.
146 @Retry(AssertionError, tries=3, timeout_secs=1)
147 def _Check():
148 stdout = self.RunGsUtil(
149 ['du', '-s', subdir1, subdir2], return_stdout=True)
150 self.assertSetEqual(set(stdout.splitlines()), set([
151 '%-10s %s' % (18, subdir1),
152 '%-10s %s' % (18, subdir2),
154 _Check()
156 @SkipForS3('S3 lists versions in reverse order.')
157 def test_versioned(self):
158 """Tests listing all versions with the -a flag."""
159 bucket_uri = self.CreateVersionedBucket()
160 object_uri1 = self.CreateObject(
161 bucket_uri=bucket_uri, object_name='foo', contents='foo')
162 object_uri2 = self.CreateObject(
163 bucket_uri=bucket_uri, object_name='foo', contents='foo2')
165 # Use @Retry as hedge against bucket listing eventual consistency.
166 @Retry(AssertionError, tries=3, timeout_secs=1)
167 def _Check1():
168 stdout = self.RunGsUtil(['du', suri(bucket_uri)], return_stdout=True)
169 self.assertEqual(stdout, '%-10s %s\n' % (4, suri(object_uri2)))
170 _Check1()
172 # Use @Retry as hedge against bucket listing eventual consistency.
173 @Retry(AssertionError, tries=3, timeout_secs=1)
174 def _Check2():
175 stdout = self.RunGsUtil(['du', '-a', suri(bucket_uri)],
176 return_stdout=True)
177 self.assertSetEqual(set(stdout.splitlines()), set([
178 '%-10s %s#%s' % (
179 3, suri(object_uri1), object_uri1.generation),
180 '%-10s %s#%s' % (
181 4, suri(object_uri2), object_uri2.generation),
183 _Check2()
185 def test_null_endings(self):
186 """Tests outputting 0-endings with the -0 flag."""
187 bucket_uri = self.CreateBucket()
188 obj_uri1 = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
189 obj_uri2 = self.CreateObject(bucket_uri=bucket_uri, contents='zebra')
190 # Use @Retry as hedge against bucket listing eventual consistency.
191 @Retry(AssertionError, tries=3, timeout_secs=1)
192 def _Check():
193 stdout = self.RunGsUtil(['du', '-0c', suri(bucket_uri)],
194 return_stdout=True)
195 self.assertSetEqual(set(stdout.split('\0')), set([
196 '%-10s %s' % (3, suri(obj_uri1)),
197 '%-10s %s' % (5, suri(obj_uri2)),
198 '%-10s total' % 8,
201 _Check()
203 def test_excludes(self):
204 """Tests exclude pattern excluding certain file paths."""
205 bucket_uri, obj_uris = self._create_nested_subdir()
207 # Use @Retry as hedge against bucket listing eventual consistency.
208 @Retry(AssertionError, tries=3, timeout_secs=1)
209 def _Check():
210 stdout = self.RunGsUtil([
211 'du', '-e', '*sub2/five*', '-e', '*sub1/four',
212 suri(bucket_uri)], return_stdout=True)
213 self.assertSetEqual(set(stdout.splitlines()), set([
214 '%-10s %s' % (5, suri(obj_uris[0])),
215 '%-10s %s' % (4, suri(obj_uris[3])),
216 '%-10s %s/sub1/sub2/' % (4, suri(bucket_uri)),
217 '%-10s %s/sub1/' % (9, suri(bucket_uri)),
219 _Check()
221 def test_excludes_file(self):
222 """Tests file exclusion with the -X flag."""
223 bucket_uri, obj_uris = self._create_nested_subdir()
224 fpath = self.CreateTempFile(contents='*sub2/five*\n*sub1/four')
226 # Use @Retry as hedge against bucket listing eventual consistency.
227 @Retry(AssertionError, tries=3, timeout_secs=1)
228 def _Check():
229 stdout = self.RunGsUtil([
230 'du', '-X', fpath, suri(bucket_uri)], return_stdout=True)
231 self.assertSetEqual(set(stdout.splitlines()), set([
232 '%-10s %s' % (5, suri(obj_uris[0])),
233 '%-10s %s' % (4, suri(obj_uris[3])),
234 '%-10s %s/sub1/sub2/' % (4, suri(bucket_uri)),
235 '%-10s %s/sub1/' % (9, suri(bucket_uri)),
237 _Check()