Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_cat.py
blob43b0ca521a427e659d24c27a29b3ae83056aaa68
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Tests for cat command."""
17 from __future__ import absolute_import
19 import gslib.tests.testcase as testcase
20 from gslib.tests.util import ObjectToURI as suri
21 from gslib.tests.util import RUN_S3_TESTS
24 class TestCat(testcase.GsUtilIntegrationTestCase):
25 """Integration tests for cat command."""
27 def test_cat_range(self):
28 """Tests cat command with various range arguments."""
29 key_uri = self.CreateObject(contents='0123456789')
30 # Test various invalid ranges.
31 stderr = self.RunGsUtil(['cat', '-r -', suri(key_uri)],
32 return_stderr=True, expected_status=1)
33 self.assertIn('Invalid range', stderr)
34 stderr = self.RunGsUtil(['cat', '-r a-b', suri(key_uri)],
35 return_stderr=True, expected_status=1)
36 self.assertIn('Invalid range', stderr)
37 stderr = self.RunGsUtil(['cat', '-r 1-2-3', suri(key_uri)],
38 return_stderr=True, expected_status=1)
39 self.assertIn('Invalid range', stderr)
40 stderr = self.RunGsUtil(['cat', '-r 1.7-3', suri(key_uri)],
41 return_stderr=True, expected_status=1)
42 self.assertIn('Invalid range', stderr)
44 # Test various valid ranges.
45 stdout = self.RunGsUtil(['cat', '-r 1-3', suri(key_uri)],
46 return_stdout=True)
47 self.assertEqual('123', stdout)
48 stdout = self.RunGsUtil(['cat', '-r 8-', suri(key_uri)],
49 return_stdout=True)
50 self.assertEqual('89', stdout)
51 stdout = self.RunGsUtil(['cat', '-r -3', suri(key_uri)],
52 return_stdout=True)
53 self.assertEqual('789', stdout)
55 def test_cat_version(self):
56 """Tests cat command on versioned objects."""
57 bucket_uri = self.CreateVersionedBucket()
58 # Create 2 versions of an object.
59 uri1 = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
60 uri2 = self.CreateObject(bucket_uri=bucket_uri,
61 object_name=uri1.object_name, contents='data2')
62 stdout = self.RunGsUtil(['cat', suri(uri1)], return_stdout=True)
63 # Last version written should be live.
64 self.assertEqual('data2', stdout)
65 # Using either version-specific URI should work.
66 stdout = self.RunGsUtil(['cat', uri1.version_specific_uri],
67 return_stdout=True)
68 self.assertEqual('data1', stdout)
69 stdout = self.RunGsUtil(['cat', uri2.version_specific_uri],
70 return_stdout=True)
71 self.assertEqual('data2', stdout)
72 if RUN_S3_TESTS:
73 # S3 GETs of invalid versions return 400s.
74 # Also, appending between 1 and 3 characters to the version_id can
75 # result in a success (200) response from the server.
76 stderr = self.RunGsUtil(['cat', uri2.version_specific_uri + '23456'],
77 return_stderr=True, expected_status=1)
78 self.assertIn('BadRequestException: 400', stderr)
79 else:
80 # Attempting to cat invalid version should result in an error.
81 stderr = self.RunGsUtil(['cat', uri2.version_specific_uri + '23'],
82 return_stderr=True, expected_status=1)
83 self.assertIn('No URLs matched', stderr)
85 def test_cat_multi_arg(self):
86 """Tests cat command with multiple arguments."""
87 bucket_uri = self.CreateBucket()
88 data1 = '0123456789'
89 data2 = 'abcdefghij'
90 obj_uri1 = self.CreateObject(bucket_uri=bucket_uri, contents=data1)
91 obj_uri2 = self.CreateObject(bucket_uri=bucket_uri, contents=data2)
92 stdout, stderr = self.RunGsUtil(
93 ['cat', suri(obj_uri1), suri(bucket_uri) + 'nonexistent'],
94 return_stdout=True, return_stderr=True, expected_status=1)
95 # First object should print, second should produce an exception.
96 self.assertIn(data1, stdout)
97 self.assertIn('NotFoundException', stderr)
99 stdout, stderr = self.RunGsUtil(
100 ['cat', suri(bucket_uri) + 'nonexistent', suri(obj_uri1)],
101 return_stdout=True, return_stderr=True, expected_status=1)
103 # If first object is invalid, exception should halt output immediately.
104 self.assertNotIn(data1, stdout)
105 self.assertIn('NotFoundException', stderr)
107 # Two valid objects should both print successfully.
108 stdout = self.RunGsUtil(['cat', suri(obj_uri1), suri(obj_uri2)],
109 return_stdout=True)
110 self.assertIn(data1 + data2, stdout)