1 # -*- coding: utf-8 -*-
2 # Copyright 2010 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Base class for gsutil commands.
17 In addition to base class code, this file contains helpers that depend on base
18 class state (such as GetAndPrintAcl) In general, functions that depend on
19 class state and that are used by multiple commands belong in this file.
20 Functions that don't depend on class state belong in util.py, and non-shared
21 helpers belong in individual subclasses.
24 from __future__
import absolute_import
27 from collections
import namedtuple
31 import multiprocessing
41 from boto
.storage_uri
import StorageUri
43 from gslib
.cloud_api
import AccessDeniedException
44 from gslib
.cloud_api
import ArgumentException
45 from gslib
.cloud_api
import ServiceException
46 from gslib
.cloud_api_delegator
import CloudApiDelegator
47 from gslib
.cs_api_map
import ApiSelector
48 from gslib
.cs_api_map
import GsutilApiMapFactory
49 from gslib
.exception
import CommandException
50 from gslib
.help_provider
import HelpProvider
51 from gslib
.name_expansion
import NameExpansionIterator
52 from gslib
.name_expansion
import NameExpansionResult
53 from gslib
.parallelism_framework_util
import AtomicIncrementDict
54 from gslib
.parallelism_framework_util
import BasicIncrementDict
55 from gslib
.parallelism_framework_util
import ThreadAndProcessSafeDict
56 from gslib
.plurality_checkable_iterator
import PluralityCheckableIterator
57 from gslib
.sig_handling
import RegisterSignalHandler
58 from gslib
.storage_url
import StorageUrlFromString
59 from gslib
.third_party
.storage_apitools
import storage_v1_messages
as apitools_messages
60 from gslib
.translation_helper
import AclTranslation
61 from gslib
.util
import GetConfigFilePath
62 from gslib
.util
import GsutilStreamHandler
63 from gslib
.util
import HaveFileUrls
64 from gslib
.util
import HaveProviderUrls
65 from gslib
.util
import IS_WINDOWS
66 from gslib
.util
import MultiprocessingIsAvailable
67 from gslib
.util
import NO_MAX
68 from gslib
.util
import UrlsAreForSingleProvider
69 from gslib
.util
import UTF8
70 from gslib
.wildcard_iterator
import CreateWildcardIterator
72 OFFER_GSUTIL_M_SUGGESTION_THRESHOLD
= 5
75 import ctypes
# pylint: disable=g-import-not-at-top
78 def _DefaultExceptionHandler(cls
, e
):
79 cls
.logger
.exception(e
)
82 def CreateGsutilLogger(command_name
):
83 """Creates a logger that resembles 'print' output.
85 This logger abides by gsutil -d/-D/-DD/-q options.
87 By default (if none of the above options is specified) the logger will display
88 all messages logged with level INFO or above. Log propagation is disabled.
91 command_name: Command name to create logger for.
96 log
= logging
.getLogger(command_name
)
98 log
.setLevel(logging
.root
.level
)
99 log_handler
= GsutilStreamHandler()
100 log_handler
.setFormatter(logging
.Formatter('%(message)s'))
101 # Commands that call other commands (like mv) would cause log handlers to be
102 # added more than once, so avoid adding if one is already present.
104 log
.addHandler(log_handler
)
108 def _UrlArgChecker(command_instance
, url
):
109 if not command_instance
.exclude_symlinks
:
111 exp_src_url
= url
.expanded_storage_url
112 if exp_src_url
.IsFileUrl() and os
.path
.islink(exp_src_url
.object_name
):
113 command_instance
.logger
.info('Skipping symbolic link %s...', exp_src_url
)
118 def DummyArgChecker(*unused_args
):
122 def SetAclFuncWrapper(cls
, name_expansion_result
, thread_state
=None):
123 return cls
.SetAclFunc(name_expansion_result
, thread_state
=thread_state
)
126 def SetAclExceptionHandler(cls
, e
):
127 """Exception handler that maintains state about post-completion status."""
128 cls
.logger
.error(str(e
))
129 cls
.everything_set_okay
= False
131 # We will keep this list of all thread- or process-safe queues ever created by
132 # the main thread so that we can forcefully kill them upon shutdown. Otherwise,
133 # we encounter a Python bug in which empty queues block forever on join (which
134 # is called as part of the Python exit function cleanup) under the impression
135 # that they are non-empty.
136 # However, this also lets us shut down somewhat more cleanly when interrupted.
140 def _NewMultiprocessingQueue():
141 queue
= multiprocessing
.Queue(MAX_QUEUE_SIZE
)
146 def _NewThreadsafeQueue():
147 queue
= Queue
.Queue(MAX_QUEUE_SIZE
)
151 # The maximum size of a process- or thread-safe queue. Imposing this limit
152 # prevents us from needing to hold an arbitrary amount of data in memory.
153 # However, setting this number too high (e.g., >= 32768 on OS X) can cause
154 # problems on some operating systems.
155 MAX_QUEUE_SIZE
= 32500
157 # That maximum depth of the tree of recursive calls to command.Apply. This is
158 # an arbitrary limit put in place to prevent developers from accidentally
159 # causing problems with infinite recursion, and it can be increased if needed.
160 MAX_RECURSIVE_DEPTH
= 5
162 ZERO_TASKS_TO_DO_ARGUMENT
= ('There were no', 'tasks to do')
164 # Map from deprecated aliases to the current command and subcommands that
165 # provide the same behavior.
166 # TODO: Remove this map and deprecate old commands on 9/9/14.
167 OLD_ALIAS_MAP
= {'chacl': ['acl', 'ch'],
168 'getacl': ['acl', 'get'],
169 'setacl': ['acl', 'set'],
170 'getcors': ['cors', 'get'],
171 'setcors': ['cors', 'set'],
172 'chdefacl': ['defacl', 'ch'],
173 'getdefacl': ['defacl', 'get'],
174 'setdefacl': ['defacl', 'set'],
175 'disablelogging': ['logging', 'set', 'off'],
176 'enablelogging': ['logging', 'set', 'on'],
177 'getlogging': ['logging', 'get'],
178 'getversioning': ['versioning', 'get'],
179 'setversioning': ['versioning', 'set'],
180 'getwebcfg': ['web', 'get'],
181 'setwebcfg': ['web', 'set']}
184 # Declare all of the module level variables - see
185 # InitializeMultiprocessingVariables for an explanation of why this is
187 # pylint: disable=global-at-module-level
188 global manager
, consumer_pools
, task_queues
, caller_id_lock
, caller_id_counter
189 global total_tasks
, call_completed_map
, global_return_values_map
190 global need_pool_or_done_cond
, caller_id_finished_count
, new_pool_needed
191 global current_max_recursive_level
, shared_vars_map
, shared_vars_list_map
192 global class_map
, worker_checking_level_lock
, failure_count
195 def InitializeMultiprocessingVariables():
196 """Initializes module-level variables that will be inherited by subprocesses.
198 On Windows, a multiprocessing.Manager object should only
199 be created within an "if __name__ == '__main__':" block. This function
200 must be called, otherwise every command that calls Command.Apply will fail.
202 # This list of global variables must exactly match the above list of
204 # pylint: disable=global-variable-undefined
205 global manager
, consumer_pools
, task_queues
, caller_id_lock
, caller_id_counter
206 global total_tasks
, call_completed_map
, global_return_values_map
207 global need_pool_or_done_cond
, caller_id_finished_count
, new_pool_needed
208 global current_max_recursive_level
, shared_vars_map
, shared_vars_list_map
209 global class_map
, worker_checking_level_lock
, failure_count
211 manager
= multiprocessing
.Manager()
215 # List of all existing task queues - used by all pools to find the queue
216 # that's appropriate for the given recursive_apply_level.
219 # Used to assign a globally unique caller ID to each Apply call.
220 caller_id_lock
= manager
.Lock()
221 caller_id_counter
= multiprocessing
.Value('i', 0)
223 # Map from caller_id to total number of tasks to be completed for that ID.
224 total_tasks
= ThreadAndProcessSafeDict(manager
)
226 # Map from caller_id to a boolean which is True iff all its tasks are
228 call_completed_map
= ThreadAndProcessSafeDict(manager
)
230 # Used to keep track of the set of return values for each caller ID.
231 global_return_values_map
= AtomicIncrementDict(manager
)
233 # Condition used to notify any waiting threads that a task has finished or
234 # that a call to Apply needs a new set of consumer processes.
235 need_pool_or_done_cond
= manager
.Condition()
237 # Lock used to prevent multiple worker processes from asking the main thread
238 # to create a new consumer pool for the same level.
239 worker_checking_level_lock
= manager
.Lock()
241 # Map from caller_id to the current number of completed tasks for that ID.
242 caller_id_finished_count
= AtomicIncrementDict(manager
)
244 # Used as a way for the main thread to distinguish between being woken up
245 # by another call finishing and being woken up by a call that needs a new set
246 # of consumer processes.
247 new_pool_needed
= multiprocessing
.Value('i', 0)
249 current_max_recursive_level
= multiprocessing
.Value('i', 0)
251 # Map from (caller_id, name) to the value of that shared variable.
252 shared_vars_map
= AtomicIncrementDict(manager
)
253 shared_vars_list_map
= ThreadAndProcessSafeDict(manager
)
255 # Map from caller_id to calling class.
256 class_map
= manager
.dict()
258 # Number of tasks that resulted in an exception in calls to Apply().
259 failure_count
= multiprocessing
.Value('i', 0)
262 # Each subclass of Command must define a property named 'command_spec' that is
263 # an instance of the following class.
264 CommandSpec
= namedtuple('CommandSpec', [
269 # List of command name aliases.
270 'command_name_aliases',
271 # Min number of args required by this command.
273 # Max number of args required by this command, or NO_MAX.
275 # Getopt-style string specifying acceptable sub args.
276 'supported_sub_args',
277 # True if file URLs are acceptable for this command.
279 # True if provider-only URLs are acceptable for this command.
281 # Index in args of first URL arg.
283 # List of supported APIs
285 # Default API to use for this command
287 # Private arguments (for internal testing)
288 'supported_private_args',
289 'argparse_arguments',
293 class Command(HelpProvider
):
294 """Base class for all gsutil commands."""
296 # Each subclass must override this with an instance of CommandSpec.
299 _commands_with_subcommands_and_subopts
= ['acl', 'defacl', 'logging', 'web',
302 # This keeps track of the recursive depth of the current call to Apply.
303 recursive_apply_level
= 0
305 # If the multiprocessing module isn't available, we'll use this to keep track
307 sequential_caller_id
= -1
310 def CreateCommandSpec(command_name
, usage_synopsis
=None,
311 command_name_aliases
=None, min_args
=0,
312 max_args
=NO_MAX
, supported_sub_args
='',
313 file_url_ok
=False, provider_url_ok
=False,
314 urls_start_arg
=0, gs_api_support
=None,
315 gs_default_api
=None, supported_private_args
=None,
316 argparse_arguments
=None):
317 """Creates an instance of CommandSpec, with defaults."""
319 command_name
=command_name
,
320 usage_synopsis
=usage_synopsis
,
321 command_name_aliases
=command_name_aliases
or [],
324 supported_sub_args
=supported_sub_args
,
325 file_url_ok
=file_url_ok
,
326 provider_url_ok
=provider_url_ok
,
327 urls_start_arg
=urls_start_arg
,
328 gs_api_support
=gs_api_support
or [ApiSelector
.XML
],
329 gs_default_api
=gs_default_api
or ApiSelector
.XML
,
330 supported_private_args
=supported_private_args
,
331 argparse_arguments
=argparse_arguments
or [])
333 # Define a convenience property for command name, since it's used many places.
334 def _GetDefaultCommandName(self
):
335 return self
.command_spec
.command_name
336 command_name
= property(_GetDefaultCommandName
)
338 def _CalculateUrlsStartArg(self
):
339 """Calculate the index in args of the first URL arg.
342 Index of the first URL arg (according to the command spec).
344 return self
.command_spec
.urls_start_arg
346 def _TranslateDeprecatedAliases(self
, args
):
347 """Map deprecated aliases to the corresponding new command, and warn."""
348 new_command_args
= OLD_ALIAS_MAP
.get(self
.command_alias_used
, None)
350 # Prepend any subcommands for the new command. The command name itself
351 # is not part of the args, so leave it out.
352 args
= new_command_args
[1:] + args
353 self
.logger
.warn('\n'.join(textwrap
.wrap(
354 ('You are using a deprecated alias, "%(used_alias)s", for the '
355 '"%(command_name)s" command. This will stop working on 9/9/2014. '
356 'Please use "%(command_name)s" with the appropriate sub-command in '
357 'the future. See "gsutil help %(command_name)s" for details.') %
358 {'used_alias': self
.command_alias_used
,
359 'command_name': self
.command_name
})))
362 def __init__(self
, command_runner
, args
, headers
, debug
, parallel_operations
,
363 bucket_storage_uri_class
, gsutil_api_class_map_factory
,
364 test_method
=None, logging_filters
=None,
365 command_alias_used
=None):
366 """Instantiates a Command.
369 command_runner: CommandRunner (for commands built atop other commands).
370 args: Command-line args (arg0 = actual arg, not command name ala bash).
371 headers: Dictionary containing optional HTTP headers to pass to boto.
372 debug: Debug level to pass in to boto connection (range 0..3).
373 parallel_operations: Should command operations be executed in parallel?
374 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
375 Settable for testing/mocking.
376 gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
377 Settable for testing/mocking.
378 test_method: Optional general purpose method for testing purposes.
379 Application and semantics of this method will vary by
380 command and test type.
381 logging_filters: Optional list of logging.Filters to apply to this
383 command_alias_used: The alias that was actually used when running this
384 command (as opposed to the "official" command name,
385 which will always correspond to the file name).
387 Implementation note: subclasses shouldn't need to define an __init__
388 method, and instead depend on the shared initialization that happens
389 here. If you do define an __init__ method in a subclass you'll need to
390 explicitly call super().__init__(). But you're encouraged not to do this,
391 because it will make changing the __init__ interface more painful.
393 # Save class values from constructor params.
394 self
.command_runner
= command_runner
395 self
.unparsed_args
= args
396 self
.headers
= headers
398 self
.parallel_operations
= parallel_operations
399 self
.bucket_storage_uri_class
= bucket_storage_uri_class
400 self
.gsutil_api_class_map_factory
= gsutil_api_class_map_factory
401 self
.test_method
= test_method
402 self
.exclude_symlinks
= False
403 self
.recursion_requested
= False
404 self
.all_versions
= False
405 self
.command_alias_used
= command_alias_used
407 # Global instance of a threaded logger object.
408 self
.logger
= CreateGsutilLogger(self
.command_name
)
410 for log_filter
in logging_filters
:
411 self
.logger
.addFilter(log_filter
)
413 if self
.command_spec
is None:
414 raise CommandException('"%s" command implementation is missing a '
415 'command_spec definition.' % self
.command_name
)
417 # Parse and validate args.
418 self
.args
= self
._TranslateDeprecatedAliases
(args
)
421 # Named tuple public functions start with _
422 # pylint: disable=protected-access
423 self
.command_spec
= self
.command_spec
._replace
(
424 urls_start_arg
=self
._CalculateUrlsStartArg
())
426 if (len(self
.args
) < self
.command_spec
.min_args
427 or len(self
.args
) > self
.command_spec
.max_args
):
428 self
.RaiseWrongNumberOfArgumentsException()
430 if self
.command_name
not in self
._commands
_with
_subcommands
_and
_subopts
:
431 self
.CheckArguments()
433 # Build the support and default maps from the command spec.
435 'gs': self
.command_spec
.gs_api_support
,
436 's3': [ApiSelector
.XML
]
439 'gs': self
.command_spec
.gs_default_api
,
440 's3': ApiSelector
.XML
442 self
.gsutil_api_map
= GsutilApiMapFactory
.GetApiMap(
443 self
.gsutil_api_class_map_factory
, support_map
, default_map
)
445 self
.project_id
= None
446 self
.gsutil_api
= CloudApiDelegator(
447 bucket_storage_uri_class
, self
.gsutil_api_map
,
448 self
.logger
, debug
=self
.debug
)
450 # Cross-platform path to run gsutil binary.
452 # If running on Windows, invoke python interpreter explicitly.
453 if gslib
.util
.IS_WINDOWS
:
454 self
.gsutil_cmd
+= 'python '
455 # Add full path to gsutil to make sure we test the correct version.
456 self
.gsutil_path
= gslib
.GSUTIL_PATH
457 self
.gsutil_cmd
+= self
.gsutil_path
459 # We're treating recursion_requested like it's used by all commands, but
460 # only some of the commands accept the -R option.
462 for o
, unused_a
in self
.sub_opts
:
463 if o
== '-r' or o
== '-R':
464 self
.recursion_requested
= True
467 self
.multiprocessing_is_available
= MultiprocessingIsAvailable()[0]
469 def RaiseWrongNumberOfArgumentsException(self
):
470 """Raises exception for wrong number of arguments supplied to command."""
471 if len(self
.args
) < self
.command_spec
.min_args
:
472 tail_str
= 's' if self
.command_spec
.min_args
> 1 else ''
473 message
= ('The %s command requires at least %d argument%s.' %
474 (self
.command_name
, self
.command_spec
.min_args
, tail_str
))
476 message
= ('The %s command accepts at most %d arguments.' %
477 (self
.command_name
, self
.command_spec
.max_args
))
478 message
+= ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % (
479 self
.command_spec
.usage_synopsis
, self
.command_name
)
480 raise CommandException(message
)
482 def RaiseInvalidArgumentException(self
):
483 """Raises exception for specifying an invalid argument to command."""
484 message
= ('Incorrect option(s) specified. Usage:\n%s\n'
485 'For additional help run:\n gsutil help %s' % (
486 self
.command_spec
.usage_synopsis
, self
.command_name
))
487 raise CommandException(message
)
489 def ParseSubOpts(self
, check_args
=False):
490 """Parses sub-opt args.
493 check_args: True to have CheckArguments() called after parsing.
496 (self.sub_opts, self.args) from parsing.
498 Raises: RaiseInvalidArgumentException if invalid args specified.
501 self
.sub_opts
, self
.args
= getopt
.getopt(
502 self
.args
, self
.command_spec
.supported_sub_args
,
503 self
.command_spec
.supported_private_args
or [])
504 except getopt
.GetoptError
:
505 self
.RaiseInvalidArgumentException()
507 self
.CheckArguments()
509 def CheckArguments(self
):
510 """Checks that command line arguments match the command_spec.
512 Any commands in self._commands_with_subcommands_and_subopts are responsible
513 for calling this method after handling initial parsing of their arguments.
514 This prevents commands with sub-commands as well as options from breaking
515 the parsing of getopt.
517 TODO: Provide a function to parse commands and sub-commands more
518 intelligently once we stop allowing the deprecated command versions.
521 CommandException if the arguments don't match.
524 if (not self
.command_spec
.file_url_ok
525 and HaveFileUrls(self
.args
[self
.command_spec
.urls_start_arg
:])):
526 raise CommandException('"%s" command does not support "file://" URLs. '
527 'Did you mean to use a gs:// URL?' %
529 if (not self
.command_spec
.provider_url_ok
530 and HaveProviderUrls(self
.args
[self
.command_spec
.urls_start_arg
:])):
531 raise CommandException('"%s" command does not support provider-only '
532 'URLs.' % self
.command_name
)
534 def WildcardIterator(self
, url_string
, all_versions
=False):
535 """Helper to instantiate gslib.WildcardIterator.
537 Args are same as gslib.WildcardIterator interface, but this method fills in
538 most of the values from instance state.
541 url_string: URL string naming wildcard objects to iterate.
542 all_versions: If true, the iterator yields all versions of objects
543 matching the wildcard. If false, yields just the live
547 WildcardIterator for use by caller.
549 return CreateWildcardIterator(
550 url_string
, self
.gsutil_api
, all_versions
=all_versions
,
551 debug
=self
.debug
, project_id
=self
.project_id
)
553 def RunCommand(self
):
554 """Abstract function in base class. Subclasses must implement this.
556 The return value of this function will be used as the exit status of the
557 process, so subclass commands should return an integer exit code (0 for
558 success, a value in [1,255] for failure).
560 raise CommandException('Command %s is missing its RunCommand() '
561 'implementation' % self
.command_name
)
563 ############################################################
564 # Shared helper functions that depend on base class state. #
565 ############################################################
567 def ApplyAclFunc(self
, acl_func
, acl_excep_handler
, url_strs
):
568 """Sets the standard or default object ACL depending on self.command_name.
571 acl_func: ACL function to be passed to Apply.
572 acl_excep_handler: ACL exception handler to be passed to Apply.
573 url_strs: URL strings on which to set ACL.
576 CommandException if an ACL could not be set.
578 multi_threaded_url_args
= []
579 # Handle bucket ACL setting operations single-threaded, because
580 # our threading machinery currently assumes it's working with objects
581 # (name_expansion_iterator), and normally we wouldn't expect users to need
582 # to set ACLs on huge numbers of buckets at once anyway.
583 for url_str
in url_strs
:
584 url
= StorageUrlFromString(url_str
)
585 if url
.IsCloudUrl() and url
.IsBucket():
586 if self
.recursion_requested
:
587 # If user specified -R option, convert any bucket args to bucket
588 # wildcards (e.g., gs://bucket/*), to prevent the operation from
589 # being applied to the buckets themselves.
590 url
.object_name
= '*'
591 multi_threaded_url_args
.append(url
.url_string
)
593 # Convert to a NameExpansionResult so we can re-use the threaded
594 # function for the single-threaded implementation. RefType is unused.
595 for blr
in self
.WildcardIterator(url
.url_string
).IterBuckets(
596 bucket_fields
=['id']):
597 name_expansion_for_url
= NameExpansionResult(
598 url
, False, False, blr
.storage_url
)
599 acl_func(self
, name_expansion_for_url
)
601 multi_threaded_url_args
.append(url_str
)
603 if len(multi_threaded_url_args
) >= 1:
604 name_expansion_iterator
= NameExpansionIterator(
605 self
.command_name
, self
.debug
,
606 self
.logger
, self
.gsutil_api
,
607 multi_threaded_url_args
, self
.recursion_requested
,
608 all_versions
=self
.all_versions
,
609 continue_on_error
=self
.continue_on_error
or self
.parallel_operations
)
611 # Perform requests in parallel (-m) mode, if requested, using
612 # configured number of parallel processes and threads. Otherwise,
613 # perform requests with sequential function calls in current process.
614 self
.Apply(acl_func
, name_expansion_iterator
, acl_excep_handler
,
615 fail_on_error
=not self
.continue_on_error
)
617 if not self
.everything_set_okay
and not self
.continue_on_error
:
618 raise CommandException('ACLs for some objects could not be set.')
620 def SetAclFunc(self
, name_expansion_result
, thread_state
=None):
621 """Sets the object ACL for the name_expansion_result provided.
624 name_expansion_result: NameExpansionResult describing the target object.
625 thread_state: If present, use this gsutil Cloud API instance for the set.
628 assert not self
.def_acl
629 gsutil_api
= thread_state
631 gsutil_api
= self
.gsutil_api
632 op_string
= 'default object ACL' if self
.def_acl
else 'ACL'
633 url
= name_expansion_result
.expanded_storage_url
634 self
.logger
.info('Setting %s on %s...', op_string
, url
)
635 if (gsutil_api
.GetApiSelector(url
.scheme
) == ApiSelector
.XML
636 and url
.scheme
!= 'gs'):
637 # If we are called with a non-google ACL model, we need to use the XML
638 # passthrough. acl_arg should either be a canned ACL or an XML ACL.
639 self
._SetAclXmlPassthrough
(url
, gsutil_api
)
641 # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL.
642 self
._SetAclGsutilApi
(url
, gsutil_api
)
644 def _SetAclXmlPassthrough(self
, url
, gsutil_api
):
645 """Sets the ACL for the URL provided using the XML passthrough functions.
647 This function assumes that self.def_acl, self.canned,
648 and self.continue_on_error are initialized, and that self.acl_arg is
649 either an XML string or a canned ACL string.
652 url: CloudURL to set the ACL on.
653 gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML
654 passthrough functions.
657 orig_prefer_api
= gsutil_api
.prefer_api
658 gsutil_api
.prefer_api
= ApiSelector
.XML
659 gsutil_api
.XmlPassThroughSetAcl(
660 self
.acl_arg
, url
, canned
=self
.canned
,
661 def_obj_acl
=self
.def_acl
, provider
=url
.scheme
)
662 except ServiceException
as e
:
663 if self
.continue_on_error
:
664 self
.everything_set_okay
= False
669 gsutil_api
.prefer_api
= orig_prefer_api
671 def _SetAclGsutilApi(self
, url
, gsutil_api
):
672 """Sets the ACL for the URL provided using the gsutil Cloud API.
674 This function assumes that self.def_acl, self.canned,
675 and self.continue_on_error are initialized, and that self.acl_arg is
676 either a JSON string or a canned ACL string.
679 url: CloudURL to set the ACL on.
680 gsutil_api: gsutil Cloud API to use for the ACL set.
686 gsutil_api
.PatchBucket(
687 url
.bucket_name
, apitools_messages
.Bucket(),
688 canned_def_acl
=self
.acl_arg
, provider
=url
.scheme
, fields
=['id'])
690 def_obj_acl
= AclTranslation
.JsonToMessage(
691 self
.acl_arg
, apitools_messages
.ObjectAccessControl
)
692 bucket_metadata
= apitools_messages
.Bucket(
693 defaultObjectAcl
=def_obj_acl
)
694 gsutil_api
.PatchBucket(url
.bucket_name
, bucket_metadata
,
695 provider
=url
.scheme
, fields
=['id'])
698 gsutil_api
.PatchBucket(
699 url
.bucket_name
, apitools_messages
.Bucket(),
700 canned_acl
=self
.acl_arg
, provider
=url
.scheme
, fields
=['id'])
702 bucket_acl
= AclTranslation
.JsonToMessage(
703 self
.acl_arg
, apitools_messages
.BucketAccessControl
)
704 bucket_metadata
= apitools_messages
.Bucket(acl
=bucket_acl
)
705 gsutil_api
.PatchBucket(url
.bucket_name
, bucket_metadata
,
706 provider
=url
.scheme
, fields
=['id'])
707 else: # url.IsObject()
709 gsutil_api
.PatchObjectMetadata(
710 url
.bucket_name
, url
.object_name
, apitools_messages
.Object(),
711 provider
=url
.scheme
, generation
=url
.generation
,
712 canned_acl
=self
.acl_arg
)
714 object_acl
= AclTranslation
.JsonToMessage(
715 self
.acl_arg
, apitools_messages
.ObjectAccessControl
)
716 object_metadata
= apitools_messages
.Object(acl
=object_acl
)
717 gsutil_api
.PatchObjectMetadata(url
.bucket_name
, url
.object_name
,
718 object_metadata
, provider
=url
.scheme
,
719 generation
=url
.generation
)
720 except ArgumentException
, e
:
722 except ServiceException
, e
:
723 if self
.continue_on_error
:
724 self
.everything_set_okay
= False
729 def SetAclCommandHelper(self
, acl_func
, acl_excep_handler
):
730 """Sets ACLs on the self.args using the passed-in acl function.
733 acl_func: ACL function to be passed to Apply.
734 acl_excep_handler: ACL exception handler to be passed to Apply.
736 acl_arg
= self
.args
[0]
737 url_args
= self
.args
[1:]
738 # Disallow multi-provider setacl requests, because there are differences in
740 if not UrlsAreForSingleProvider(url_args
):
741 raise CommandException('"%s" command spanning providers not allowed.' %
744 # Determine whether acl_arg names a file containing XML ACL text vs. the
745 # string name of a canned ACL.
746 if os
.path
.isfile(acl_arg
):
747 with codecs
.open(acl_arg
, 'r', UTF8
) as f
:
751 # No file exists, so expect a canned ACL string.
752 # Canned ACLs are not supported in JSON and we need to use the XML API
754 # validate=False because we allow wildcard urls.
755 storage_uri
= boto
.storage_uri(
756 url_args
[0], debug
=self
.debug
, validate
=False,
757 bucket_storage_uri_class
=self
.bucket_storage_uri_class
)
759 canned_acls
= storage_uri
.canned_acls()
760 if acl_arg
not in canned_acls
:
761 raise CommandException('Invalid canned ACL "%s".' % acl_arg
)
764 # Used to track if any ACLs failed to be set.
765 self
.everything_set_okay
= True
766 self
.acl_arg
= acl_arg
768 self
.ApplyAclFunc(acl_func
, acl_excep_handler
, url_args
)
769 if not self
.everything_set_okay
and not self
.continue_on_error
:
770 raise CommandException('ACLs for some objects could not be set.')
772 def _WarnServiceAccounts(self
):
773 """Warns service account users who have received an AccessDenied error.
775 When one of the metadata-related commands fails due to AccessDenied, user
776 must ensure that they are listed as an Owner in the API console.
778 # Import this here so that the value will be set first in
779 # gcs_oauth2_boto_plugin.
780 # pylint: disable=g-import-not-at-top
781 from gcs_oauth2_boto_plugin
.oauth2_plugin
import IS_SERVICE_ACCOUNT
783 if IS_SERVICE_ACCOUNT
:
784 # This method is only called when canned ACLs are used, so the warning
785 # definitely applies.
786 self
.logger
.warning('\n'.join(textwrap
.wrap(
787 'It appears that your service account has been denied access while '
788 'attempting to perform a metadata operation. If you believe that you '
789 'should have access to this metadata (i.e., if it is associated with '
790 'your account), please make sure that your service account''s email '
791 'address is listed as an Owner in the Team tab of the API console. '
792 'See "gsutil help creds" for further information.\n')))
794 def GetAndPrintAcl(self
, url_str
):
795 """Prints the standard or default object ACL depending on self.command_name.
798 url_str: URL string to get ACL for.
800 blr
= self
.GetAclCommandBucketListingReference(url_str
)
801 url
= StorageUrlFromString(url_str
)
802 if (self
.gsutil_api
.GetApiSelector(url
.scheme
) == ApiSelector
.XML
803 and url
.scheme
!= 'gs'):
804 # Need to use XML passthrough.
806 acl
= self
.gsutil_api
.XmlPassThroughGetAcl(
807 url
, def_obj_acl
=self
.def_acl
, provider
=url
.scheme
)
809 except AccessDeniedException
, _
:
810 self
._WarnServiceAccounts
()
813 if self
.command_name
== 'defacl':
814 acl
= blr
.root_object
.defaultObjectAcl
817 'No default object ACL present for %s. This could occur if '
818 'the default object ACL is private, in which case objects '
819 'created in this bucket will be readable only by their '
820 'creators. It could also mean you do not have OWNER permission '
821 'on %s and therefore do not have permission to read the '
822 'default object ACL.', url_str
, url_str
)
824 acl
= blr
.root_object
.acl
826 self
._WarnServiceAccounts
()
827 raise AccessDeniedException('Access denied. Please ensure you have '
828 'OWNER permission on %s.' % url_str
)
829 print AclTranslation
.JsonFromMessage(acl
)
831 def GetAclCommandBucketListingReference(self
, url_str
):
832 """Gets a single bucket listing reference for an acl get command.
835 url_str: URL string to get the bucket listing reference for.
838 BucketListingReference for the URL string.
841 CommandException if string did not result in exactly one reference.
843 # We're guaranteed by caller that we have the appropriate type of url
844 # string for the call (ex. we will never be called with an object string
846 wildcard_url
= StorageUrlFromString(url_str
)
847 if wildcard_url
.IsObject():
848 plurality_iter
= PluralityCheckableIterator(
849 self
.WildcardIterator(url_str
).IterObjects(
850 bucket_listing_fields
=['acl']))
852 # Bucket or provider. We call IterBuckets explicitly here to ensure that
853 # the root object is populated with the acl.
854 if self
.command_name
== 'defacl':
855 bucket_fields
= ['defaultObjectAcl']
857 bucket_fields
= ['acl']
858 plurality_iter
= PluralityCheckableIterator(
859 self
.WildcardIterator(url_str
).IterBuckets(
860 bucket_fields
=bucket_fields
))
861 if plurality_iter
.IsEmpty():
862 raise CommandException('No URLs matched')
863 if plurality_iter
.HasPlurality():
864 raise CommandException(
865 '%s matched more than one URL, which is not allowed by the %s '
866 'command' % (url_str
, self
.command_name
))
867 return list(plurality_iter
)[0]
869 def _HandleMultiProcessingSigs(self
, unused_signal_num
,
870 unused_cur_stack_frame
):
871 """Handles signals INT AND TERM during a multi-process/multi-thread request.
876 unused_signal_num: signal generated by ^C.
877 unused_cur_stack_frame: Current stack frame.
879 # Note: This only works under Linux/MacOS. See
880 # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
881 # about why making it work correctly across OS's is harder and still open.
883 sys
.stderr
.write('Caught ^C - exiting\n')
884 # Simply calling sys.exit(1) doesn't work - see above bug for details.
885 KillProcess(os
.getpid())
887 def GetSingleBucketUrlFromArg(self
, arg
, bucket_fields
=None):
888 """Gets a single bucket URL based on the command arguments.
891 arg: String argument to get bucket URL for.
892 bucket_fields: Fields to populate for the bucket.
895 (StorageUrl referring to a single bucket, Bucket metadata).
898 CommandException if args did not match exactly one bucket.
900 plurality_checkable_iterator
= self
.GetBucketUrlIterFromArg(
901 arg
, bucket_fields
=bucket_fields
)
902 if plurality_checkable_iterator
.HasPlurality():
903 raise CommandException(
904 '%s matched more than one URL, which is not\n'
905 'allowed by the %s command' % (arg
, self
.command_name
))
906 blr
= list(plurality_checkable_iterator
)[0]
907 return StorageUrlFromString(blr
.url_string
), blr
.root_object
909 def GetBucketUrlIterFromArg(self
, arg
, bucket_fields
=None):
910 """Gets a single bucket URL based on the command arguments.
913 arg: String argument to iterate over.
914 bucket_fields: Fields to populate for the bucket.
917 PluralityCheckableIterator over buckets.
920 CommandException if iterator matched no buckets.
922 arg_url
= StorageUrlFromString(arg
)
923 if not arg_url
.IsCloudUrl() or arg_url
.IsObject():
924 raise CommandException('"%s" command must specify a bucket' %
927 plurality_checkable_iterator
= PluralityCheckableIterator(
928 self
.WildcardIterator(arg
).IterBuckets(
929 bucket_fields
=bucket_fields
))
930 if plurality_checkable_iterator
.IsEmpty():
931 raise CommandException('No URLs matched')
932 return plurality_checkable_iterator
934 ######################
935 # Private functions. #
936 ######################
938 def _ResetConnectionPool(self
):
939 # Each OS process needs to establish its own set of connections to
940 # the server to avoid writes from different OS processes interleaving
941 # onto the same socket (and garbling the underlying SSL session).
942 # We ensure each process gets its own set of connections here by
943 # closing all connections in the storage provider connection pool.
944 connection_pool
= StorageUri
.provider_pool
946 for i
in connection_pool
:
947 connection_pool
[i
].connection
.close()
949 def _GetProcessAndThreadCount(self
, process_count
, thread_count
,
950 parallel_operations_override
):
951 """Determines the values of process_count and thread_count.
953 These values are used for parallel operations.
954 If we're not performing operations in parallel, then ignore
955 existing values and use process_count = thread_count = 1.
958 process_count: A positive integer or None. In the latter case, we read
959 the value from the .boto config file.
960 thread_count: A positive integer or None. In the latter case, we read
961 the value from the .boto config file.
962 parallel_operations_override: Used to override self.parallel_operations.
963 This allows the caller to safely override
964 the top-level flag for a single call.
967 (process_count, thread_count): The number of processes and threads to use,
970 # Set OS process and python thread count as a function of options
972 if self
.parallel_operations
or parallel_operations_override
:
973 if not process_count
:
974 process_count
= boto
.config
.getint(
975 'GSUtil', 'parallel_process_count',
976 gslib
.commands
.config
.DEFAULT_PARALLEL_PROCESS_COUNT
)
977 if process_count
< 1:
978 raise CommandException('Invalid parallel_process_count "%d".' %
981 thread_count
= boto
.config
.getint(
982 'GSUtil', 'parallel_thread_count',
983 gslib
.commands
.config
.DEFAULT_PARALLEL_THREAD_COUNT
)
985 raise CommandException('Invalid parallel_thread_count "%d".' %
988 # If -m not specified, then assume 1 OS process and 1 Python thread.
992 if IS_WINDOWS
and process_count
> 1:
993 raise CommandException('\n'.join(textwrap
.wrap(
994 ('It is not possible to set process_count > 1 on Windows. Please '
995 'update your config file (located at %s) and set '
996 '"parallel_process_count = 1".') %
997 GetConfigFilePath())))
998 self
.logger
.debug('process count: %d', process_count
)
999 self
.logger
.debug('thread count: %d', thread_count
)
1001 return (process_count
, thread_count
)
1003 def _SetUpPerCallerState(self
):
1004 """Set up the state for a caller id, corresponding to one Apply call."""
1005 # Get a new caller ID.
1006 with caller_id_lock
:
1007 caller_id_counter
.value
+= 1
1008 caller_id
= caller_id_counter
.value
1010 # Create a copy of self with an incremented recursive level. This allows
1011 # the class to report its level correctly if the function called from it
1012 # also needs to call Apply.
1013 cls
= copy
.copy(self
)
1014 cls
.recursive_apply_level
+= 1
1016 # Thread-safe loggers can't be pickled, so we will remove it here and
1017 # recreate it later in the WorkerThread. This is not a problem since any
1018 # logger with the same name will be treated as a singleton.
1021 # Likewise, the default API connection can't be pickled, but it is unused
1022 # anyway as each thread gets its own API delegator.
1023 cls
.gsutil_api
= None
1025 class_map
[caller_id
] = cls
1026 total_tasks
[caller_id
] = -1 # -1 => the producer hasn't finished yet.
1027 call_completed_map
[caller_id
] = False
1028 caller_id_finished_count
.Put(caller_id
, 0)
1029 global_return_values_map
.Put(caller_id
, [])
1032 def _CreateNewConsumerPool(self
, num_processes
, num_threads
):
1033 """Create a new pool of processes that call _ApplyThreads."""
1035 task_queue
= _NewMultiprocessingQueue()
1036 task_queues
.append(task_queue
)
1038 current_max_recursive_level
.value
+= 1
1039 if current_max_recursive_level
.value
> MAX_RECURSIVE_DEPTH
:
1040 raise CommandException('Recursion depth of Apply calls is too great.')
1041 for _
in range(num_processes
):
1042 recursive_apply_level
= len(consumer_pools
)
1043 p
= multiprocessing
.Process(
1044 target
=self
._ApplyThreads
,
1045 args
=(num_threads
, num_processes
, recursive_apply_level
))
1049 consumer_pool
= _ConsumerPool(processes
, task_queue
)
1050 consumer_pools
.append(consumer_pool
)
1052 def Apply(self
, func
, args_iterator
, exception_handler
,
1053 shared_attrs
=None, arg_checker
=_UrlArgChecker
,
1054 parallel_operations_override
=False, process_count
=None,
1055 thread_count
=None, should_return_results
=False,
1056 fail_on_error
=False):
1057 """Calls _Parallel/SequentialApply based on multiprocessing availability.
1060 func: Function to call to process each argument.
1061 args_iterator: Iterable collection of arguments to be put into the
1063 exception_handler: Exception handler for WorkerThread class.
1064 shared_attrs: List of attributes to manage across sub-processes.
1065 arg_checker: Used to determine whether we should process the current
1066 argument or simply skip it. Also handles any logging that
1067 is specific to a particular type of argument.
1068 parallel_operations_override: Used to override self.parallel_operations.
1069 This allows the caller to safely override
1070 the top-level flag for a single call.
1071 process_count: The number of processes to use. If not specified, then
1072 the configured default will be used.
1073 thread_count: The number of threads per process. If not speficied, then
1074 the configured default will be used..
1075 should_return_results: If true, then return the results of all successful
1076 calls to func in a list.
1077 fail_on_error: If true, then raise any exceptions encountered when
1078 executing func. This is only applicable in the case of
1079 process_count == thread_count == 1.
1082 Results from spawned threads.
1085 original_shared_vars_values
= {} # We'll add these back in at the end.
1086 for name
in shared_attrs
:
1087 original_shared_vars_values
[name
] = getattr(self
, name
)
1088 # By setting this to 0, we simplify the logic for computing deltas.
1089 # We'll add it back after all of the tasks have been performed.
1090 setattr(self
, name
, 0)
1092 (process_count
, thread_count
) = self
._GetProcessAndThreadCount
(
1093 process_count
, thread_count
, parallel_operations_override
)
1095 is_main_thread
= (self
.recursive_apply_level
== 0
1096 and self
.sequential_caller_id
== -1)
1098 # We don't honor the fail_on_error flag in the case of multiple threads
1100 fail_on_error
= fail_on_error
and (process_count
* thread_count
== 1)
1102 # Only check this from the first call in the main thread. Apart from the
1103 # fact that it's wasteful to try this multiple times in general, it also
1104 # will never work when called from a subprocess since we use daemon
1105 # processes, and daemons can't create other processes.
1107 if ((not self
.multiprocessing_is_available
)
1108 and thread_count
* process_count
> 1):
1109 # Run the check again and log the appropriate warnings. This was run
1110 # before, when the Command object was created, in order to calculate
1111 # self.multiprocessing_is_available, but we don't want to print the
1112 # warning until we're sure the user actually tried to use multiple
1113 # threads or processes.
1114 MultiprocessingIsAvailable(logger
=self
.logger
)
1116 if self
.multiprocessing_is_available
:
1117 caller_id
= self
._SetUpPerCallerState
()
1119 self
.sequential_caller_id
+= 1
1120 caller_id
= self
.sequential_caller_id
1123 # pylint: disable=global-variable-undefined
1124 global global_return_values_map
, shared_vars_map
, failure_count
1125 global caller_id_finished_count
, shared_vars_list_map
1126 global_return_values_map
= BasicIncrementDict()
1127 global_return_values_map
.Put(caller_id
, [])
1128 shared_vars_map
= BasicIncrementDict()
1129 caller_id_finished_count
= BasicIncrementDict()
1130 shared_vars_list_map
= {}
1133 # If any shared attributes passed by caller, create a dictionary of
1134 # shared memory variables for every element in the list of shared
1137 shared_vars_list_map
[caller_id
] = shared_attrs
1138 for name
in shared_attrs
:
1139 shared_vars_map
.Put((caller_id
, name
), 0)
1141 # Make all of the requested function calls.
1142 if self
.multiprocessing_is_available
and thread_count
* process_count
> 1:
1143 self
._ParallelApply
(func
, args_iterator
, exception_handler
, caller_id
,
1144 arg_checker
, process_count
, thread_count
,
1145 should_return_results
, fail_on_error
)
1147 self
._SequentialApply
(func
, args_iterator
, exception_handler
, caller_id
,
1148 arg_checker
, should_return_results
, fail_on_error
)
1151 for name
in shared_attrs
:
1152 # This allows us to retain the original value of the shared variable,
1153 # and simply apply the delta after what was done during the call to
1155 final_value
= (original_shared_vars_values
[name
] +
1156 shared_vars_map
.Get((caller_id
, name
)))
1157 setattr(self
, name
, final_value
)
1159 if should_return_results
:
1160 return global_return_values_map
.Get(caller_id
)
1162 def _MaybeSuggestGsutilDashM(self
):
1163 """Outputs a sugestion to the user to use gsutil -m."""
1164 if not (boto
.config
.getint('GSUtil', 'parallel_process_count', 0) == 1 and
1165 boto
.config
.getint('GSUtil', 'parallel_thread_count', 0) == 1):
1166 self
.logger
.info('\n' + textwrap
.fill(
1167 '==> NOTE: You are performing a sequence of gsutil operations that '
1168 'may run significantly faster if you instead use gsutil -m %s ...\n'
1169 'Please see the -m section under "gsutil help options" for further '
1170 'information about when gsutil -m can be advantageous.'
1171 % sys
.argv
[1]) + '\n')
1173 # pylint: disable=g-doc-args
1174 def _SequentialApply(self
, func
, args_iterator
, exception_handler
, caller_id
,
1175 arg_checker
, should_return_results
, fail_on_error
):
1176 """Performs all function calls sequentially in the current thread.
1178 No other threads or processes will be spawned. This degraded functionality
1179 is used when the multiprocessing module is not available or the user
1180 requests only one thread and one process.
1182 # Create a WorkerThread to handle all of the logic needed to actually call
1183 # the function. Note that this thread will never be started, and all work
1184 # is done in the current thread.
1185 worker_thread
= WorkerThread(None, False)
1186 args_iterator
= iter(args_iterator
)
1187 # Count of sequential calls that have been made. Used for producing
1188 # suggestion to use gsutil -m.
1189 sequential_call_count
= 0
1192 # Try to get the next argument, handling any exceptions that arise.
1194 args
= args_iterator
.next()
1195 except StopIteration, e
:
1197 except Exception, e
: # pylint: disable=broad-except
1198 _IncrementFailureCount()
1203 exception_handler(self
, e
)
1204 except Exception, _
: # pylint: disable=broad-except
1206 'Caught exception while handling exception for %s:\n%s',
1207 func
, traceback
.format_exc())
1210 sequential_call_count
+= 1
1211 if sequential_call_count
== OFFER_GSUTIL_M_SUGGESTION_THRESHOLD
:
1212 # Output suggestion near beginning of run, so user sees it early and can
1213 # ^C and try gsutil -m.
1214 self
._MaybeSuggestGsutilDashM
()
1215 if arg_checker(self
, args
):
1216 # Now that we actually have the next argument, perform the task.
1217 task
= Task(func
, args
, caller_id
, exception_handler
,
1218 should_return_results
, arg_checker
, fail_on_error
)
1219 worker_thread
.PerformTask(task
, self
)
1220 if sequential_call_count
>= gslib
.util
.GetTermLines():
1221 # Output suggestion at end of long run, in case user missed it at the
1222 # start and it scrolled off-screen.
1223 self
._MaybeSuggestGsutilDashM
()
1225 # pylint: disable=g-doc-args
1226 def _ParallelApply(self
, func
, args_iterator
, exception_handler
, caller_id
,
1227 arg_checker
, process_count
, thread_count
,
1228 should_return_results
, fail_on_error
):
1229 """Dispatches input arguments across a thread/process pool.
1231 Pools are composed of parallel OS processes and/or Python threads,
1232 based on options (-m or not) and settings in the user's config file.
1234 If only one OS process is requested/available, dispatch requests across
1235 threads in the current OS process.
1237 In the multi-process case, we will create one pool of worker processes for
1238 each level of the tree of recursive calls to Apply. E.g., if A calls
1239 Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
1240 will only create two sets of worker processes - B will execute in the first,
1241 and C and D will execute in the second. If C is then changed to call
1242 Apply(E) and D is changed to call Apply(F), then we will automatically
1243 create a third set of processes (lazily, when needed) that will be used to
1244 execute calls to E and F. This might look something like:
1252 Apply's parallelism is generally broken up into 4 cases:
1253 - If process_count == thread_count == 1, then all tasks will be executed
1254 by _SequentialApply.
1255 - If process_count > 1 and thread_count == 1, then the main thread will
1256 create a new pool of processes (if they don't already exist) and each of
1257 those processes will execute the tasks in a single thread.
1258 - If process_count == 1 and thread_count > 1, then this process will create
1259 a new pool of threads to execute the tasks.
1260 - If process_count > 1 and thread_count > 1, then the main thread will
1261 create a new pool of processes (if they don't already exist) and each of
1262 those processes will, upon creation, create a pool of threads to
1266 caller_id: The caller ID unique to this call to command.Apply.
1267 See command.Apply for description of other arguments.
1269 is_main_thread
= self
.recursive_apply_level
== 0
1271 # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before
1273 if not IS_WINDOWS
and is_main_thread
:
1274 # Register as a final signal handler because this handler kills the
1275 # main gsutil process (so it must run last).
1276 RegisterSignalHandler(signal
.SIGINT
, self
._HandleMultiProcessingSigs
,
1277 is_final_handler
=True)
1278 RegisterSignalHandler(signal
.SIGTERM
, self
._HandleMultiProcessingSigs
,
1279 is_final_handler
=True)
1282 # The process we create will need to access the next recursive level
1283 # of task queues if it makes a call to Apply, so we always keep around
1284 # one more queue than we know we need. OTOH, if we don't create a new
1285 # process, the existing process still needs a task queue to use.
1286 task_queues
.append(_NewMultiprocessingQueue())
1288 if process_count
> 1: # Handle process pool creation.
1289 # Check whether this call will need a new set of workers.
1291 # Each worker must acquire a shared lock before notifying the main thread
1292 # that it needs a new worker pool, so that at most one worker asks for
1293 # a new worker pool at once.
1295 if not is_main_thread
:
1296 worker_checking_level_lock
.acquire()
1297 if self
.recursive_apply_level
>= current_max_recursive_level
.value
:
1298 with need_pool_or_done_cond
:
1299 # Only the main thread is allowed to create new processes -
1300 # otherwise, we will run into some Python bugs.
1302 self
._CreateNewConsumerPool
(process_count
, thread_count
)
1304 # Notify the main thread that we need a new consumer pool.
1305 new_pool_needed
.value
= 1
1306 need_pool_or_done_cond
.notify_all()
1307 # The main thread will notify us when it finishes.
1308 need_pool_or_done_cond
.wait()
1310 if not is_main_thread
:
1311 worker_checking_level_lock
.release()
1313 # If we're running in this process, create a separate task queue. Otherwise,
1314 # if Apply has already been called with process_count > 1, then there will
1315 # be consumer pools trying to use our processes.
1316 if process_count
> 1:
1317 task_queue
= task_queues
[self
.recursive_apply_level
]
1319 task_queue
= _NewMultiprocessingQueue()
1321 # Kick off a producer thread to throw tasks in the global task queue. We
1322 # do this asynchronously so that the main thread can be free to create new
1323 # consumer pools when needed (otherwise, any thread with a task that needs
1324 # a new consumer pool must block until we're completely done producing; in
1325 # the worst case, every worker blocks on such a call and the producer fills
1326 # up the task queue before it finishes, so we block forever).
1327 producer_thread
= ProducerThread(copy
.copy(self
), args_iterator
, caller_id
,
1328 func
, task_queue
, should_return_results
,
1329 exception_handler
, arg_checker
,
1332 if process_count
> 1:
1333 # Wait here until either:
1334 # 1. We're the main thread and someone needs a new consumer pool - in
1335 # which case we create one and continue waiting.
1336 # 2. Someone notifies us that all of the work we requested is done, in
1337 # which case we retrieve the results (if applicable) and stop
1340 with need_pool_or_done_cond
:
1341 # Either our call is done, or someone needs a new level of consumer
1342 # pools, or we the wakeup call was meant for someone else. It's
1343 # impossible for both conditions to be true, since the main thread is
1344 # blocked on any other ongoing calls to Apply, and a thread would not
1345 # ask for a new consumer pool unless it had more work to do.
1346 if call_completed_map
[caller_id
]:
1348 elif is_main_thread
and new_pool_needed
.value
:
1349 new_pool_needed
.value
= 0
1350 self
._CreateNewConsumerPool
(process_count
, thread_count
)
1351 need_pool_or_done_cond
.notify_all()
1353 # Note that we must check the above conditions before the wait() call;
1354 # otherwise, the notification can happen before we start waiting, in
1355 # which case we'll block forever.
1356 need_pool_or_done_cond
.wait()
1357 else: # Using a single process.
1358 self
._ApplyThreads
(thread_count
, process_count
,
1359 self
.recursive_apply_level
,
1360 is_blocking_call
=True, task_queue
=task_queue
)
1362 # We encountered an exception from the producer thread before any arguments
1363 # were enqueued, but it wouldn't have been propagated, so we'll now
1364 # explicitly raise it here.
1365 if producer_thread
.unknown_exception
:
1366 # pylint: disable=raising-bad-type
1367 raise producer_thread
.unknown_exception
1369 # We encountered an exception from the producer thread while iterating over
1370 # the arguments, so raise it here if we're meant to fail on error.
1371 if producer_thread
.iterator_exception
and fail_on_error
:
1372 # pylint: disable=raising-bad-type
1373 raise producer_thread
.iterator_exception
1375 def _ApplyThreads(self
, thread_count
, process_count
, recursive_apply_level
,
1376 is_blocking_call
=False, task_queue
=None):
1377 """Assigns the work from the multi-process global task queue.
1379 Work is assigned to an individual process for later consumption either by
1380 the WorkerThreads or (if thread_count == 1) this thread.
1383 thread_count: The number of threads used to perform the work. If 1, then
1384 perform all work in this thread.
1385 process_count: The number of processes used to perform the work.
1386 recursive_apply_level: The depth in the tree of recursive calls to Apply
1388 is_blocking_call: True iff the call to Apply is blocked on this call
1389 (which is true iff process_count == 1), implying that
1390 _ApplyThreads must behave as a blocking call.
1392 self
._ResetConnectionPool
()
1393 self
.recursive_apply_level
= recursive_apply_level
1395 task_queue
= task_queue
or task_queues
[recursive_apply_level
]
1397 assert thread_count
* process_count
> 1, (
1398 'Invalid state, calling command._ApplyThreads with only one thread '
1400 worker_pool
= WorkerPool(
1401 thread_count
, self
.logger
,
1402 bucket_storage_uri_class
=self
.bucket_storage_uri_class
,
1403 gsutil_api_map
=self
.gsutil_api_map
, debug
=self
.debug
)
1407 task
= task_queue
.get()
1408 if task
.args
!= ZERO_TASKS_TO_DO_ARGUMENT
:
1409 # If we have no tasks to do and we're performing a blocking call, we
1410 # need a special signal to tell us to stop - otherwise, we block on
1411 # the call to task_queue.get() forever.
1412 worker_pool
.AddTask(task
)
1415 if is_blocking_call
:
1416 num_to_do
= total_tasks
[task
.caller_id
]
1417 # The producer thread won't enqueue the last task until after it has
1418 # updated total_tasks[caller_id], so we know that num_to_do < 0 implies
1419 # we will do this check again.
1420 if num_to_do
>= 0 and num_enqueued
== num_to_do
:
1421 if thread_count
== 1:
1425 with need_pool_or_done_cond
:
1426 if call_completed_map
[task
.caller_id
]:
1427 # We need to check this first, in case the condition was
1428 # notified before we grabbed the lock.
1430 need_pool_or_done_cond
.wait()
1433 # Below here lie classes and functions related to controlling the flow of tasks
1434 # between various threads and processes.
1437 class _ConsumerPool(object):
1439 def __init__(self
, processes
, task_queue
):
1440 self
.processes
= processes
1441 self
.task_queue
= task_queue
1444 for process
in self
.processes
:
1445 KillProcess(process
.pid
)
1448 def KillProcess(pid
):
1449 """Make best effort to kill the given process.
1451 We ignore all exceptions so a caller looping through a list of processes will
1452 continue attempting to kill each, even if one encounters a problem.
1455 pid: The process ID.
1458 # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2.
1459 if IS_WINDOWS
and ((2, 6) <= sys
.version_info
[:3] < (2, 7) or
1460 (3, 0) <= sys
.version_info
[:3] < (3, 2)):
1461 kernel32
= ctypes
.windll
.kernel32
1462 handle
= kernel32
.OpenProcess(1, 0, pid
)
1463 kernel32
.TerminateProcess(handle
, 0)
1465 os
.kill(pid
, signal
.SIGKILL
)
1466 except: # pylint: disable=bare-except
1470 class Task(namedtuple('Task', (
1471 'func args caller_id exception_handler should_return_results arg_checker '
1473 """Task class representing work to be completed.
1476 func: The function to be executed.
1477 args: The arguments to func.
1478 caller_id: The globally-unique caller ID corresponding to the Apply call.
1479 exception_handler: The exception handler to use if the call to func fails.
1480 should_return_results: True iff the results of this function should be
1481 returned from the Apply call.
1482 arg_checker: Used to determine whether we should process the current
1483 argument or simply skip it. Also handles any logging that
1484 is specific to a particular type of argument.
1485 fail_on_error: If true, then raise any exceptions encountered when
1486 executing func. This is only applicable in the case of
1487 process_count == thread_count == 1.
1492 class ProducerThread(threading
.Thread
):
1493 """Thread used to enqueue work for other processes and threads."""
1495 def __init__(self
, cls
, args_iterator
, caller_id
, func
, task_queue
,
1496 should_return_results
, exception_handler
, arg_checker
,
1498 """Initializes the producer thread.
1501 cls: Instance of Command for which this ProducerThread was created.
1502 args_iterator: Iterable collection of arguments to be put into the
1504 caller_id: Globally-unique caller ID corresponding to this call to Apply.
1505 func: The function to be called on each element of args_iterator.
1506 task_queue: The queue into which tasks will be put, to later be consumed
1507 by Command._ApplyThreads.
1508 should_return_results: True iff the results for this call to command.Apply
1510 exception_handler: The exception handler to use when errors are
1511 encountered during calls to func.
1512 arg_checker: Used to determine whether we should process the current
1513 argument or simply skip it. Also handles any logging that
1514 is specific to a particular type of argument.
1515 fail_on_error: If true, then raise any exceptions encountered when
1516 executing func. This is only applicable in the case of
1517 process_count == thread_count == 1.
1519 super(ProducerThread
, self
).__init
__()
1522 self
.args_iterator
= args_iterator
1523 self
.caller_id
= caller_id
1524 self
.task_queue
= task_queue
1525 self
.arg_checker
= arg_checker
1526 self
.exception_handler
= exception_handler
1527 self
.should_return_results
= should_return_results
1528 self
.fail_on_error
= fail_on_error
1529 self
.shared_variables_updater
= _SharedVariablesUpdater()
1531 self
.unknown_exception
= None
1532 self
.iterator_exception
= None
1540 args_iterator
= iter(self
.args_iterator
)
1543 args
= args_iterator
.next()
1544 except StopIteration, e
:
1546 except Exception, e
: # pylint: disable=broad-except
1547 _IncrementFailureCount()
1548 if self
.fail_on_error
:
1549 self
.iterator_exception
= e
1553 self
.exception_handler(self
.cls
, e
)
1554 except Exception, _
: # pylint: disable=broad-except
1555 self
.cls
.logger
.debug(
1556 'Caught exception while handling exception for %s:\n%s',
1557 self
.func
, traceback
.format_exc())
1558 self
.shared_variables_updater
.Update(self
.caller_id
, self
.cls
)
1561 if self
.arg_checker(self
.cls
, args
):
1563 last_task
= cur_task
1564 cur_task
= Task(self
.func
, args
, self
.caller_id
,
1565 self
.exception_handler
, self
.should_return_results
,
1566 self
.arg_checker
, self
.fail_on_error
)
1568 self
.task_queue
.put(last_task
)
1569 except Exception, e
: # pylint: disable=broad-except
1570 # This will also catch any exception raised due to an error in the
1571 # iterator when fail_on_error is set, so check that we failed for some
1572 # other reason before claiming that we had an unknown exception.
1573 if not self
.iterator_exception
:
1574 self
.unknown_exception
= e
1576 # We need to make sure to update total_tasks[caller_id] before we enqueue
1577 # the last task. Otherwise, a worker can retrieve the last task and
1578 # complete it, then check total_tasks and determine that we're not done
1579 # producing all before we update total_tasks. This approach forces workers
1580 # to wait on the last task until after we've updated total_tasks.
1581 total_tasks
[self
.caller_id
] = num_tasks
1583 # This happens if there were zero arguments to be put in the queue.
1584 cur_task
= Task(None, ZERO_TASKS_TO_DO_ARGUMENT
, self
.caller_id
,
1585 None, None, None, None)
1586 self
.task_queue
.put(cur_task
)
1588 # It's possible that the workers finished before we updated total_tasks,
1589 # so we need to check here as well.
1590 _NotifyIfDone(self
.caller_id
,
1591 caller_id_finished_count
.Get(self
.caller_id
))
1594 class WorkerPool(object):
1595 """Pool of worker threads to which tasks can be added."""
1597 def __init__(self
, thread_count
, logger
, bucket_storage_uri_class
=None,
1598 gsutil_api_map
=None, debug
=0):
1599 self
.task_queue
= _NewThreadsafeQueue()
1601 for _
in range(thread_count
):
1602 worker_thread
= WorkerThread(
1603 self
.task_queue
, logger
,
1604 bucket_storage_uri_class
=bucket_storage_uri_class
,
1605 gsutil_api_map
=gsutil_api_map
, debug
=debug
)
1606 self
.threads
.append(worker_thread
)
1607 worker_thread
.start()
1609 def AddTask(self
, task
):
1610 self
.task_queue
.put(task
)
1613 class WorkerThread(threading
.Thread
):
1614 """Thread where all the work will be performed.
1616 This makes the function calls for Apply and takes care of all error handling,
1617 return value propagation, and shared_vars.
1619 Note that this thread is NOT started upon instantiation because the function-
1620 calling logic is also used in the single-threaded case.
1623 def __init__(self
, task_queue
, logger
, bucket_storage_uri_class
=None,
1624 gsutil_api_map
=None, debug
=0):
1625 """Initializes the worker thread.
1628 task_queue: The thread-safe queue from which this thread should obtain
1630 logger: Logger to use for this thread.
1631 bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
1632 Settable for testing/mocking.
1633 gsutil_api_map: Map of providers and API selector tuples to api classes
1634 which can be used to communicate with those providers.
1635 Used for the instantiating CloudApiDelegator class.
1636 debug: debug level for the CloudApiDelegator class.
1638 super(WorkerThread
, self
).__init
__()
1639 self
.task_queue
= task_queue
1641 self
.cached_classes
= {}
1642 self
.shared_vars_updater
= _SharedVariablesUpdater()
1644 self
.thread_gsutil_api
= None
1645 if bucket_storage_uri_class
and gsutil_api_map
:
1646 self
.thread_gsutil_api
= CloudApiDelegator(
1647 bucket_storage_uri_class
, gsutil_api_map
, logger
, debug
=debug
)
1649 def PerformTask(self
, task
, cls
):
1650 """Makes the function call for a task.
1653 task: The Task to perform.
1654 cls: The instance of a class which gives context to the functions called
1655 by the Task's function. E.g., see SetAclFuncWrapper.
1657 caller_id
= task
.caller_id
1659 results
= task
.func(cls
, task
.args
, thread_state
=self
.thread_gsutil_api
)
1660 if task
.should_return_results
:
1661 global_return_values_map
.Update(caller_id
, [results
], default_value
=[])
1662 except Exception, e
: # pylint: disable=broad-except
1663 _IncrementFailureCount()
1664 if task
.fail_on_error
:
1665 raise # Only happens for single thread and process case.
1668 task
.exception_handler(cls
, e
)
1669 except Exception, _
: # pylint: disable=broad-except
1670 # Don't allow callers to raise exceptions here and kill the worker
1673 'Caught exception while handling exception for %s:\n%s',
1674 task
, traceback
.format_exc())
1676 self
.shared_vars_updater
.Update(caller_id
, cls
)
1678 # Even if we encounter an exception, we still need to claim that that
1679 # the function finished executing. Otherwise, we won't know when to
1680 # stop waiting and return results.
1681 num_done
= caller_id_finished_count
.Update(caller_id
, 1)
1683 if cls
.multiprocessing_is_available
:
1684 _NotifyIfDone(caller_id
, num_done
)
1688 task
= self
.task_queue
.get()
1689 caller_id
= task
.caller_id
1691 # Get the instance of the command with the appropriate context.
1692 cls
= self
.cached_classes
.get(caller_id
, None)
1694 cls
= copy
.copy(class_map
[caller_id
])
1695 cls
.logger
= CreateGsutilLogger(cls
.command_name
)
1696 self
.cached_classes
[caller_id
] = cls
1698 self
.PerformTask(task
, cls
)
1701 class _SharedVariablesUpdater(object):
1702 """Used to update shared variable for a class in the global map.
1704 Note that each thread will have its own instance of the calling class for
1705 context, and it will also have its own instance of a
1706 _SharedVariablesUpdater. This is used in the following way:
1708 1. Before any tasks are performed, each thread will get a copy of the
1709 calling class, and the globally-consistent value of this shared variable
1710 will be initialized to whatever it was before the call to Apply began.
1712 2. After each time a thread performs a task, it will look at the current
1713 values of the shared variables in its instance of the calling class.
1715 2.A. For each such variable, it computes the delta of this variable
1716 between the last known value for this class (which is stored in
1717 a dict local to this class) and the current value of the variable
1720 2.B. Using this delta, we update the last known value locally as well
1721 as the globally-consistent value shared across all classes (the
1722 globally consistent value is simply increased by the computed
1727 self
.last_shared_var_values
= {}
1729 def Update(self
, caller_id
, cls
):
1730 """Update any shared variables with their deltas."""
1731 shared_vars
= shared_vars_list_map
.get(caller_id
, None)
1733 for name
in shared_vars
:
1734 key
= (caller_id
, name
)
1735 last_value
= self
.last_shared_var_values
.get(key
, 0)
1736 # Compute the change made since the last time we updated here. This is
1737 # calculated by simply subtracting the last known value from the current
1738 # value in the class instance.
1739 delta
= getattr(cls
, name
) - last_value
1740 self
.last_shared_var_values
[key
] = delta
+ last_value
1742 # Update the globally-consistent value by simply increasing it by the
1744 shared_vars_map
.Update(key
, delta
)
1747 def _NotifyIfDone(caller_id
, num_done
):
1748 """Notify any threads waiting for results that something has finished.
1750 Each waiting thread will then need to check the call_completed_map to see if
1753 Note that num_done could be calculated here, but it is passed in as an
1754 optimization so that we have one less call to a globally-locked data
1758 caller_id: The caller_id of the function whose progress we're checking.
1759 num_done: The number of tasks currently completed for that caller_id.
1761 num_to_do
= total_tasks
[caller_id
]
1762 if num_to_do
== num_done
and num_to_do
>= 0:
1763 # Notify the Apply call that's sleeping that it's ready to return.
1764 with need_pool_or_done_cond
:
1765 call_completed_map
[caller_id
] = True
1766 need_pool_or_done_cond
.notify_all()
1769 def ShutDownGsutil():
1770 """Shut down all processes in consumer pools in preparation for exiting."""
1773 q
.cancel_join_thread()
1774 except: # pylint: disable=bare-except
1776 for consumer_pool
in consumer_pools
:
1777 consumer_pool
.ShutDown()
1780 # pylint: disable=global-variable-undefined
1781 def _IncrementFailureCount():
1782 global failure_count
1783 if isinstance(failure_count
, int):
1785 else: # Otherwise it's a multiprocessing.Value() of type 'i'.
1786 failure_count
.value
+= 1
1789 # pylint: disable=global-variable-undefined
1790 def GetFailureCount():
1791 """Returns the number of failures processed during calls to Apply()."""
1793 if isinstance(failure_count
, int):
1794 return failure_count
1795 else: # It's a multiprocessing.Value() of type 'i'.
1796 return failure_count
.value
1797 except NameError: # If it wasn't initialized, Apply() wasn't called.
1801 def ResetFailureCount():
1802 """Resets the failure_count variable to 0 - useful if error is expected."""
1804 global failure_count
1805 if isinstance(failure_count
, int):
1807 else: # It's a multiprocessing.Value() of type 'i'.
1808 failure_count
= multiprocessing
.Value('i', 0)
1809 except NameError: # If it wasn't initialized, Apply() wasn't called.