1 # -*- coding: utf-8 -*-
2 # Copyright 2012 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Name expansion iterator and result classes.
17 Name expansion support for the various ways gsutil lets users refer to
18 collections of data (via explicit wildcarding as well as directory,
19 bucket, and bucket subdir implicit wildcarding). This class encapsulates
20 the various rules for determining how these expansions are done.
23 # Disable warnings for NameExpansionIteratorQueue functions; they implement
24 # an interface which does not follow lint guidelines.
25 # pylint: disable=invalid-name
27 from __future__
import absolute_import
29 import multiprocessing
33 from gslib
.exception
import CommandException
34 from gslib
.plurality_checkable_iterator
import PluralityCheckableIterator
35 import gslib
.wildcard_iterator
36 from gslib
.wildcard_iterator
import StorageUrlFromString
39 class NameExpansionResult(object):
40 """Holds one fully expanded result from iterating over NameExpansionIterator.
42 The member data in this class need to be pickleable because
43 NameExpansionResult instances are passed through Multiprocessing.Queue. In
44 particular, don't include any boto state like StorageUri, since that pulls
45 in a big tree of objects, some of which aren't pickleable (and even if
46 they were, pickling/unpickling such a large object tree would result in
47 significant overhead).
49 The state held in this object is needed for handling the various naming cases
50 (e.g., copying from a single source URL to a directory generates different
51 dest URL names than copying multiple URLs to a directory, to be consistent
52 with naming rules used by the Unix cp command). For more details see comments
53 in _NameExpansionIterator.
56 def __init__(self
, source_storage_url
, is_multi_source_request
,
57 names_container
, expanded_storage_url
):
58 """Instantiates a result from name expansion.
61 source_storage_url: StorageUrl that was being expanded.
62 is_multi_source_request: bool indicator whether src_url_str expanded to
63 more than one BucketListingRef.
64 names_container: Bool indicator whether src_url names a container.
65 expanded_storage_url: StorageUrl that was expanded.
67 self
.source_storage_url
= source_storage_url
68 self
.is_multi_source_request
= is_multi_source_request
69 self
.names_container
= names_container
70 self
.expanded_storage_url
= expanded_storage_url
73 return '%s' % self
._expanded
_storage
_url
76 class _NameExpansionIterator(object):
77 """Class that iterates over all source URLs passed to the iterator.
79 See details in __iter__ function doc.
82 def __init__(self
, command_name
, debug
, logger
, gsutil_api
, url_strs
,
83 recursion_requested
, all_versions
=False,
84 cmd_supports_recursion
=True, project_id
=None,
85 continue_on_error
=False):
86 """Creates a NameExpansionIterator.
89 command_name: name of command being run.
90 debug: Debug level to pass to underlying iterators (range 0..3).
91 logger: logging.Logger object.
92 gsutil_api: Cloud storage interface. Settable for testing/mocking.
93 url_strs: PluralityCheckableIterator of URL strings needing expansion.
94 recursion_requested: True if -r specified on command-line. If so,
95 listings will be flattened so mapped-to results contain objects
96 spanning subdirectories.
97 all_versions: Bool indicating whether to iterate over all object versions.
98 cmd_supports_recursion: Bool indicating whether this command supports a
99 '-r' flag. Useful for printing helpful error messages.
100 project_id: Project id to use for bucket retrieval.
101 continue_on_error: If true, yield no-match exceptions encountered during
102 iteration instead of raising them.
104 Examples of _NameExpansionIterator with recursion_requested=True:
105 - Calling with one of the url_strs being 'gs://bucket' will enumerate all
106 top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'.
107 - 'gs://bucket/**' will enumerate all objects in the bucket.
108 - 'gs://bucket/abc' will enumerate either the single object abc or, if
109 abc is a subdirectory, all objects under abc and any of its
111 - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its
113 - 'file:///tmp' will enumerate all files under /tmp, as will
115 - 'file:///tmp/**' will enumerate all files under /tmp or any of its
118 Example if recursion_requested=False:
119 calling with gs://bucket/abc/* lists matching objects
120 or subdirs, but not sub-subdirs or objects beneath subdirs.
122 Note: In step-by-step comments below we give examples assuming there's a
123 gs://bucket with object paths:
128 and a directory file://dir with file paths:
133 self
.command_name
= command_name
136 self
.gsutil_api
= gsutil_api
137 self
.url_strs
= url_strs
138 self
.recursion_requested
= recursion_requested
139 self
.all_versions
= all_versions
140 # Check self.url_strs.HasPlurality() at start because its value can change
141 # if url_strs is itself an iterator.
142 self
.url_strs
.has_plurality
= self
.url_strs
.HasPlurality()
143 self
.cmd_supports_recursion
= cmd_supports_recursion
144 self
.project_id
= project_id
145 self
.continue_on_error
= continue_on_error
147 # Map holding wildcard strings to use for flat vs subdir-by-subdir listings.
148 # (A flat listing means show all objects expanded all the way down.)
149 self
._flatness
_wildcard
= {True: '**', False: '*'}
152 """Iterates over all source URLs passed to the iterator.
154 For each src url, expands wildcards, object-less bucket names,
155 subdir bucket names, and directory names, and generates a flat listing of
156 all the matching objects/files.
158 You should instantiate this object using the static factory function
159 NameExpansionIterator, because consumers of this iterator need the
160 PluralityCheckableIterator wrapper built by that function.
163 gslib.name_expansion.NameExpansionResult.
166 CommandException: if errors encountered.
168 for url_str
in self
.url_strs
:
169 storage_url
= StorageUrlFromString(url_str
)
171 if storage_url
.IsFileUrl() and storage_url
.IsStream():
172 if self
.url_strs
.has_plurality
:
173 raise CommandException('Multiple URL strings are not supported '
174 'with streaming ("-") URLs.')
175 yield NameExpansionResult(storage_url
, False, False, storage_url
)
178 # Step 1: Expand any explicitly specified wildcards. The output from this
179 # step is an iterator of BucketListingRef.
180 # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd
182 src_names_bucket
= False
183 if (storage_url
.IsCloudUrl() and storage_url
.IsBucket()
184 and not self
.recursion_requested
):
185 # UNIX commands like rm and cp will omit directory references.
186 # If url_str refers only to buckets and we are not recursing,
187 # then produce references of type BUCKET, because they are guaranteed
188 # to pass through Step 2 and be omitted in Step 3.
189 post_step1_iter
= PluralityCheckableIterator(
190 self
.WildcardIterator(url_str
).IterBuckets(
191 bucket_fields
=['id']))
193 # Get a list of objects and prefixes, expanding the top level for
194 # any listed buckets. If our source is a bucket, however, we need
195 # to treat all of the top level expansions as names_container=True.
196 post_step1_iter
= PluralityCheckableIterator(
197 self
.WildcardIterator(url_str
).IterAll(
198 bucket_listing_fields
=['name'],
199 expand_top_level_buckets
=True))
200 if storage_url
.IsCloudUrl() and storage_url
.IsBucket():
201 src_names_bucket
= True
203 # Step 2: Expand bucket subdirs. The output from this
204 # step is an iterator of (names_container, BucketListingRef).
205 # Starting with gs://bucket/abcd this step would expand to:
206 # iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]).
207 subdir_exp_wildcard
= self
._flatness
_wildcard
[self
.recursion_requested
]
208 if self
.recursion_requested
:
209 post_step2_iter
= _ImplicitBucketSubdirIterator(
210 self
, post_step1_iter
, subdir_exp_wildcard
)
212 post_step2_iter
= _NonContainerTuplifyIterator(post_step1_iter
)
213 post_step2_iter
= PluralityCheckableIterator(post_step2_iter
)
215 # Because we actually perform and check object listings here, this will
216 # raise if url_args includes a non-existent object. However,
217 # plurality_checkable_iterator will buffer the exception for us, not
218 # raising it until the iterator is actually asked to yield the first
220 if post_step2_iter
.IsEmpty():
221 if self
.continue_on_error
:
223 raise CommandException('No URLs matched: %s' % url_str
)
224 except CommandException
, e
:
225 # Yield a specialized tuple of (exception, stack_trace) to
226 # the wrapping PluralityCheckableIterator.
227 yield (e
, sys
.exc_info()[2])
229 raise CommandException('No URLs matched: %s' % url_str
)
231 # Step 3. Omit any directories, buckets, or bucket subdirectories for
232 # non-recursive expansions.
233 post_step3_iter
= PluralityCheckableIterator(_OmitNonRecursiveIterator(
234 post_step2_iter
, self
.recursion_requested
, self
.command_name
,
235 self
.cmd_supports_recursion
, self
.logger
))
237 src_url_expands_to_multi
= post_step3_iter
.HasPlurality()
238 is_multi_source_request
= (self
.url_strs
.has_plurality
239 or src_url_expands_to_multi
)
241 # Step 4. Expand directories and buckets. This step yields the iterated
242 # values. Starting with gs://bucket this step would expand to:
243 # [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt]
244 # Starting with file://dir this step would expand to:
245 # [dir/a.txt, dir/b.txt, dir/c/]
246 for (names_container
, blr
) in post_step3_iter
:
247 src_names_container
= src_names_bucket
or names_container
250 yield NameExpansionResult(
251 storage_url
, is_multi_source_request
, src_names_container
,
254 # Use implicit wildcarding to do the enumeration.
255 # At this point we are guaranteed that:
256 # - Recursion has been requested because non-object entries are
257 # filtered in step 3 otherwise.
258 # - This is a prefix or bucket subdirectory because only
259 # non-recursive iterations product bucket references.
260 expanded_url
= StorageUrlFromString(blr
.url_string
)
261 if expanded_url
.IsFileUrl():
262 # Convert dir to implicit recursive wildcard.
263 url_to_iterate
= '%s%s%s' % (blr
, os
.sep
, subdir_exp_wildcard
)
265 # Convert subdir to implicit recursive wildcard.
266 url_to_iterate
= expanded_url
.CreatePrefixUrl(
267 wildcard_suffix
=subdir_exp_wildcard
)
269 wc_iter
= PluralityCheckableIterator(
270 self
.WildcardIterator(url_to_iterate
).IterObjects(
271 bucket_listing_fields
=['name']))
272 src_url_expands_to_multi
= (src_url_expands_to_multi
273 or wc_iter
.HasPlurality())
274 is_multi_source_request
= (self
.url_strs
.has_plurality
275 or src_url_expands_to_multi
)
276 # This will be a flattened listing of all underlying objects in the
279 yield NameExpansionResult(
280 storage_url
, is_multi_source_request
, True, blr
.storage_url
)
282 def WildcardIterator(self
, url_string
):
283 """Helper to instantiate gslib.WildcardIterator.
285 Args are same as gslib.WildcardIterator interface, but this method fills
286 in most of the values from instance state.
289 url_string: URL string naming wildcard objects to iterate.
292 Wildcard iterator over URL string.
294 return gslib
.wildcard_iterator
.CreateWildcardIterator(
295 url_string
, self
.gsutil_api
, debug
=self
.debug
,
296 all_versions
=self
.all_versions
,
297 project_id
=self
.project_id
)
300 def NameExpansionIterator(command_name
, debug
, logger
, gsutil_api
, url_strs
,
301 recursion_requested
, all_versions
=False,
302 cmd_supports_recursion
=True, project_id
=None,
303 continue_on_error
=False):
304 """Static factory function for instantiating _NameExpansionIterator.
306 This wraps the resulting iterator in a PluralityCheckableIterator and checks
307 that it is non-empty. Also, allows url_strs to be either an array or an
311 command_name: name of command being run.
312 debug: Debug level to pass to underlying iterators (range 0..3).
313 logger: logging.Logger object.
314 gsutil_api: Cloud storage interface. Settable for testing/mocking.
315 url_strs: Iterable URL strings needing expansion.
316 recursion_requested: True if -r specified on command-line. If so,
317 listings will be flattened so mapped-to results contain objects
318 spanning subdirectories.
319 all_versions: Bool indicating whether to iterate over all object versions.
320 cmd_supports_recursion: Bool indicating whether this command supports a '-r'
321 flag. Useful for printing helpful error messages.
322 project_id: Project id to use for the current command.
323 continue_on_error: If true, yield no-match exceptions encountered during
324 iteration instead of raising them.
327 CommandException if underlying iterator is empty.
330 Name expansion iterator instance.
332 For example semantics, see comments in NameExpansionIterator.__init__.
334 url_strs
= PluralityCheckableIterator(url_strs
)
335 name_expansion_iterator
= _NameExpansionIterator(
336 command_name
, debug
, logger
, gsutil_api
, url_strs
, recursion_requested
,
337 all_versions
=all_versions
, cmd_supports_recursion
=cmd_supports_recursion
,
338 project_id
=project_id
, continue_on_error
=continue_on_error
)
339 name_expansion_iterator
= PluralityCheckableIterator(name_expansion_iterator
)
340 if name_expansion_iterator
.IsEmpty():
341 raise CommandException('No URLs matched')
342 return name_expansion_iterator
345 class NameExpansionIteratorQueue(object):
346 """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface.
348 Only a blocking get() function can be called, and the block and timeout
349 params on that function are ignored. All other class functions raise
352 This class is thread safe.
355 def __init__(self
, name_expansion_iterator
, final_value
):
356 self
.name_expansion_iterator
= name_expansion_iterator
357 self
.final_value
= final_value
358 self
.lock
= multiprocessing
.Manager().Lock()
361 raise NotImplementedError(
362 'NameExpansionIteratorQueue.qsize() not implemented')
365 raise NotImplementedError(
366 'NameExpansionIteratorQueue.empty() not implemented')
369 raise NotImplementedError(
370 'NameExpansionIteratorQueue.full() not implemented')
372 # pylint: disable=unused-argument
373 def put(self
, obj
=None, block
=None, timeout
=None):
374 raise NotImplementedError(
375 'NameExpansionIteratorQueue.put() not implemented')
377 def put_nowait(self
, obj
):
378 raise NotImplementedError(
379 'NameExpansionIteratorQueue.put_nowait() not implemented')
381 # pylint: disable=unused-argument
382 def get(self
, block
=None, timeout
=None):
385 if self
.name_expansion_iterator
.IsEmpty():
386 return self
.final_value
387 return self
.name_expansion_iterator
.next()
391 def get_nowait(self
):
392 raise NotImplementedError(
393 'NameExpansionIteratorQueue.get_nowait() not implemented')
395 def get_no_wait(self
):
396 raise NotImplementedError(
397 'NameExpansionIteratorQueue.get_no_wait() not implemented')
400 raise NotImplementedError(
401 'NameExpansionIteratorQueue.close() not implemented')
403 def join_thread(self
):
404 raise NotImplementedError(
405 'NameExpansionIteratorQueue.join_thread() not implemented')
407 def cancel_join_thread(self
):
408 raise NotImplementedError(
409 'NameExpansionIteratorQueue.cancel_join_thread() not implemented')
412 class _NonContainerTuplifyIterator(object):
413 """Iterator that produces the tuple (False, blr) for each iterated value.
415 Used for cases where blr_iter iterates over a set of
416 BucketListingRefs known not to name containers.
419 def __init__(self
, blr_iter
):
420 """Instantiates iterator.
423 blr_iter: iterator of BucketListingRef.
425 self
.blr_iter
= blr_iter
428 for blr
in self
.blr_iter
:
432 class _OmitNonRecursiveIterator(object):
433 """Iterator wrapper for that omits certain values for non-recursive requests.
435 This iterates over tuples of (names_container, BucketListingReference) and
436 omits directories, prefixes, and buckets from non-recurisve requests
437 so that we can properly calculate whether the source URL expands to multiple
440 For example, if we have a bucket containing two objects: bucket/foo and
441 bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be
445 def __init__(self
, tuple_iter
, recursion_requested
, command_name
,
446 cmd_supports_recursion
, logger
):
447 """Instanties the iterator.
450 tuple_iter: Iterator over names_container, BucketListingReference
451 from step 2 in the NameExpansionIterator
452 recursion_requested: If false, omit buckets, dirs, and subdirs
453 command_name: Command name for user messages
454 cmd_supports_recursion: Command recursion support for user messages
455 logger: Log object for user messages
457 self
.tuple_iter
= tuple_iter
458 self
.recursion_requested
= recursion_requested
459 self
.command_name
= command_name
460 self
.cmd_supports_recursion
= cmd_supports_recursion
464 for (names_container
, blr
) in self
.tuple_iter
:
465 if not self
.recursion_requested
and not blr
.IsObject():
466 # At this point we either have a bucket or a prefix,
467 # so if recursion is not requested, we're going to omit it.
468 expanded_url
= StorageUrlFromString(blr
.url_string
)
469 if expanded_url
.IsFileUrl():
473 if self
.cmd_supports_recursion
:
475 'Omitting %s "%s". (Did you mean to do %s -r?)',
476 desc
, blr
.url_string
, self
.command_name
)
478 self
.logger
.info('Omitting %s "%s".', desc
, blr
.url_string
)
480 yield (names_container
, blr
)
483 class _ImplicitBucketSubdirIterator(object):
484 """Iterator wrapper that performs implicit bucket subdir expansion.
486 Each iteration yields tuple (names_container, expanded BucketListingRefs)
487 where names_container is true if URL names a directory, bucket,
490 For example, iterating over [BucketListingRef("gs://abc")] would expand to:
491 [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")]
492 if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise.
495 def __init__(self
, name_exp_instance
, blr_iter
, subdir_exp_wildcard
):
496 """Instantiates the iterator.
499 name_exp_instance: calling instance of NameExpansion class.
500 blr_iter: iterator over BucketListingRef prefixes and objects.
501 subdir_exp_wildcard: wildcard for expanding subdirectories;
502 expected values are ** if the mapped-to results should contain
503 objects spanning subdirectories, or * if only one level should
506 self
.blr_iter
= blr_iter
507 self
.name_exp_instance
= name_exp_instance
508 self
.subdir_exp_wildcard
= subdir_exp_wildcard
511 for blr
in self
.blr_iter
:
513 # This is a bucket subdirectory, list objects according to the wildcard.
514 prefix_url
= StorageUrlFromString(blr
.url_string
).CreatePrefixUrl(
515 wildcard_suffix
=self
.subdir_exp_wildcard
)
516 implicit_subdir_iterator
= PluralityCheckableIterator(
517 self
.name_exp_instance
.WildcardIterator(
518 prefix_url
).IterAll(bucket_listing_fields
=['name']))
519 if not implicit_subdir_iterator
.IsEmpty():
520 for exp_blr
in implicit_subdir_iterator
:
521 yield (True, exp_blr
)
523 # Prefix that contains no objects, for example in the $folder$ case
524 # or an empty filesystem directory.
529 raise CommandException(
530 '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr
)