1 # -*- coding: utf-8 -*-
2 # Copyright 2012 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Contains the perfdiag gsutil command."""
17 from __future__
import absolute_import
20 from collections
import defaultdict
28 import multiprocessing
39 import boto
.gs
.connection
42 from gslib
.cloud_api
import NotFoundException
43 from gslib
.cloud_api
import ServiceException
44 from gslib
.cloud_api_helper
import GetDownloadSerializationDict
45 from gslib
.command
import Command
46 from gslib
.command
import DummyArgChecker
47 from gslib
.command_argument
import CommandArgument
48 from gslib
.commands
import config
49 from gslib
.cs_api_map
import ApiSelector
50 from gslib
.exception
import CommandException
51 from gslib
.hashing_helper
import CalculateB64EncodedMd5FromContents
52 from gslib
.storage_url
import StorageUrlFromString
53 from gslib
.third_party
.storage_apitools
import storage_v1_messages
as apitools_messages
54 from gslib
.util
import GetCloudApiInstance
55 from gslib
.util
import GetMaxRetryDelay
56 from gslib
.util
import HumanReadableToBytes
57 from gslib
.util
import IS_LINUX
58 from gslib
.util
import MakeBitsHumanReadable
59 from gslib
.util
import MakeHumanReadable
60 from gslib
.util
import Percentile
61 from gslib
.util
import ResumableThreshold
65 gsutil perfdiag [-i in.json]
66 gsutil perfdiag [-o out.json] [-n iterations] [-c processes]
67 [-k threads] [-s size] [-t tests] url...
70 _DETAILED_HELP_TEXT
= ("""
76 The perfdiag command runs a suite of diagnostic tests for a given Google
79 The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
80 the user has write permission. Several test files will be uploaded to and
81 downloaded from this bucket. All test files will be deleted at the completion
82 of the diagnostic if it finishes successfully.
84 gsutil performance can be impacted by many factors at the client, server,
85 and in-between, such as: CPU speed; available memory; the access path to the
86 local disk; network bandwidth; contention and error rates along the path
87 between gsutil and Google; operating system buffering configuration; and
88 firewalls and other network elements. The perfdiag command is provided so
89 that customers can run a known measurement suite when troubleshooting
93 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B>
94 If the Google Cloud Storage Team asks you to run a performance diagnostic
95 please use the following command, and email the output file (output.json)
96 to gs-team@google.com:
98 gsutil perfdiag -o output.json gs://your-bucket
102 -n Sets the number of iterations performed when downloading and
103 uploading files during latency and throughput tests. Defaults to
106 -c Sets the number of processes to use while running throughput
107 experiments. The default value is 1.
109 -k Sets the number of threads per process to use while running
110 throughput experiments. Each process will receive an equal number
111 of threads. The default value is 1.
113 -s Sets the size (in bytes) of the test file used to perform read
114 and write throughput tests. The default is 1 MiB. This can also
115 be specified using byte suffixes such as 500K or 1M. Note: these
116 values are interpreted as multiples of 1024 (K=1024, M=1024*1024,
119 -t Sets the list of diagnostic tests to perform. The default is to
120 run all diagnostic tests. Must be a comma-separated list
121 containing one or more of the following:
124 Runs N iterations (set with -n) of writing the file,
125 retrieving its metadata, reading the file, and deleting
126 the file. Records the latency of each operation.
129 Write N (set with -n) objects to the bucket, record how long
130 it takes for the eventually consistent listing call to return
131 the N objects in its result, delete the N objects, then record
132 how long it takes listing to stop returning the N objects.
133 This test is off by default.
136 Runs N (set with -n) read operations, with at most C
137 (set with -c) reads outstanding at any given time.
140 Runs N (set with -n) write operations, with at most C
141 (set with -c) writes outstanding at any given time.
143 -m Adds metadata to the result JSON file. Multiple -m values can be
146 gsutil perfdiag -m "key1:value1" -m "key2:value2" \
149 Each metadata key will be added to the top-level "metadata"
150 dictionary in the output JSON file.
152 -o Writes the results of the diagnostic to an output file. The output
153 is a JSON file containing system information and performance
154 diagnostic results. The file can be read and reported later using
157 -i Reads the JSON output file created using the -o command and prints
158 a formatted description of the results.
161 <B>MEASURING AVAILABILITY</B>
162 The perfdiag command ignores the boto num_retries configuration parameter.
163 Instead, it always retries on HTTP errors in the 500 range and keeps track of
164 how many 500 errors were encountered during the test. The availability
165 measurement is reported at the end of the test.
167 Note that HTTP responses are only recorded when the request was made in a
168 single process. When using multiple processes or threads, read and write
169 throughput measurements are performed in an external process, so the
170 availability numbers reported won't include the throughput measurements.
174 The perfdiag command collects system information. It collects your IP address,
175 executes DNS queries to Google servers and collects the results, and collects
176 network statistics information from the output of netstat -s. It will also
177 attempt to connect to your proxy server if you have one configured. None of
178 this information will be sent to Google unless you choose to send it.
182 class Error(Exception):
183 """Base exception class for this module."""
187 class InvalidArgument(Error
):
188 """Raised on invalid arguments to functions."""
192 def _DownloadWrapper(cls
, arg
, thread_state
=None):
193 cls
.Download(arg
, thread_state
=thread_state
)
196 def _UploadWrapper(cls
, arg
, thread_state
=None):
197 cls
.Upload(arg
, thread_state
=thread_state
)
200 def _DeleteWrapper(cls
, arg
, thread_state
=None):
201 cls
.Delete(arg
, thread_state
=thread_state
)
204 def _PerfdiagExceptionHandler(cls
, e
):
205 """Simple exception handler to allow post-completion status."""
206 cls
.logger
.error(str(e
))
209 def _DummyTrackerCallback(_
):
213 class DummyFile(object):
214 """A dummy, file-like object that throws away everything written to it."""
216 def write(self
, *args
, **kwargs
): # pylint: disable=invalid-name
220 # Many functions in perfdiag re-define a temporary function based on a
221 # variable from a loop, resulting in a false positive from the linter.
222 # pylint: disable=cell-var-from-loop
223 class PerfDiagCommand(Command
):
224 """Implementation of gsutil perfdiag command."""
226 # Command specification. See base class for documentation.
227 command_spec
= Command
.CreateCommandSpec(
229 command_name_aliases
=['diag', 'diagnostic', 'perf', 'performance'],
230 usage_synopsis
=_SYNOPSIS
,
233 supported_sub_args
='n:c:k:s:t:m:i:o:',
235 provider_url_ok
=False,
237 gs_api_support
=[ApiSelector
.XML
, ApiSelector
.JSON
],
238 gs_default_api
=ApiSelector
.JSON
,
240 CommandArgument
.MakeNCloudBucketURLsArgument(1)
243 # Help specification. See help_provider.py for documentation.
244 help_spec
= Command
.HelpSpec(
245 help_name
='perfdiag',
246 help_name_aliases
=[],
247 help_type
='command_help',
248 help_one_line_summary
='Run performance diagnostic',
249 help_text
=_DETAILED_HELP_TEXT
,
250 subcommand_help_text
={},
253 # Byte sizes to use for latency testing files.
254 # TODO: Consider letting the user specify these sizes with a configuration
269 # List of all diagnostic tests.
270 ALL_DIAG_TESTS
= (RTHRU
, WTHRU
, LAT
, LIST
)
271 # List of diagnostic tests to run by default.
272 DEFAULT_DIAG_TESTS
= (RTHRU
, WTHRU
, LAT
)
274 # Google Cloud Storage XML API endpoint host.
275 XML_API_HOST
= boto
.config
.get(
276 'Credentials', 'gs_host', boto
.gs
.connection
.GSConnection
.DefaultHost
)
277 # Google Cloud Storage XML API endpoint port.
278 XML_API_PORT
= boto
.config
.get('Credentials', 'gs_port', 80)
280 # Maximum number of times to retry requests on 5xx errors.
281 MAX_SERVER_ERROR_RETRIES
= 5
282 # Maximum number of times to retry requests on more serious errors like
283 # the socket breaking.
284 MAX_TOTAL_RETRIES
= 10
286 # The default buffer size in boto's Key object is set to 8 KiB. This becomes a
287 # bottleneck at high throughput rates, so we increase it.
288 KEY_BUFFER_SIZE
= 16384
290 # The maximum number of bytes to generate pseudo-randomly before beginning
291 # to repeat bytes. This number was chosen as the next prime larger than 5 MiB.
292 MAX_UNIQUE_RANDOM_BYTES
= 5242883
294 # Maximum amount of time, in seconds, we will wait for object listings to
295 # reflect what we expect in the listing tests.
296 MAX_LISTING_WAIT_TIME
= 60.0
298 def _Exec(self
, cmd
, raise_on_error
=True, return_output
=False,
300 """Executes a command in a subprocess.
303 cmd: List containing the command to execute.
304 raise_on_error: Whether or not to raise an exception when a process exits
305 with a non-zero return code.
306 return_output: If set to True, the return value of the function is the
307 stdout of the process.
308 mute_stderr: If set to True, the stderr of the process is not printed to
312 The return code of the process or the stdout if return_output is set.
315 Exception: If raise_on_error is set to True and any process exits with a
316 non-zero return code.
318 self
.logger
.debug('Running command: %s', cmd
)
319 stderr
= subprocess
.PIPE
if mute_stderr
else None
320 p
= subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=stderr
)
321 (stdoutdata
, _
) = p
.communicate()
322 if raise_on_error
and p
.returncode
:
323 raise CommandException("Received non-zero return code (%d) from "
324 "subprocess '%s'." % (p
.returncode
, ' '.join(cmd
)))
325 return stdoutdata
if return_output
else p
.returncode
328 """Performs setup operations needed before diagnostics can be run."""
330 # Stores test result data.
332 # List of test files in a temporary location on disk for latency ops.
333 self
.latency_files
= []
334 # List of test objects to clean up in the test bucket.
335 self
.test_object_names
= set()
336 # Maps each test file path to its size in bytes.
338 # Maps each test file to its contents as a string.
339 self
.file_contents
= {}
340 # Maps each test file to its MD5 hash.
342 # Total number of HTTP requests made.
343 self
.total_requests
= 0
344 # Total number of HTTP 5xx errors.
345 self
.request_errors
= 0
346 # Number of responses, keyed by response code.
347 self
.error_responses_by_code
= defaultdict(int)
348 # Total number of socket errors.
349 self
.connection_breaks
= 0
351 def _MakeFile(file_size
):
352 """Creates a temporary file of the given size and returns its path."""
353 fd
, fpath
= tempfile
.mkstemp(suffix
='.bin', prefix
='gsutil_test_file',
355 self
.file_sizes
[fpath
] = file_size
356 random_bytes
= os
.urandom(min(file_size
, self
.MAX_UNIQUE_RANDOM_BYTES
))
359 while total_bytes
< file_size
:
360 num_bytes
= min(self
.MAX_UNIQUE_RANDOM_BYTES
, file_size
- total_bytes
)
361 file_contents
+= random_bytes
[:num_bytes
]
362 total_bytes
+= num_bytes
363 self
.file_contents
[fpath
] = file_contents
364 with os
.fdopen(fd
, 'wb') as f
:
365 f
.write(self
.file_contents
[fpath
])
366 with
open(fpath
, 'rb') as f
:
367 self
.file_md5s
[fpath
] = CalculateB64EncodedMd5FromContents(f
)
370 # Create files for latency tests.
371 for file_size
in self
.test_file_sizes
:
372 fpath
= _MakeFile(file_size
)
373 self
.latency_files
.append(fpath
)
375 # Creating a file for warming up the TCP connection.
376 self
.tcp_warmup_file
= _MakeFile(5 * 1024 * 1024) # 5 Mebibytes.
377 # Remote file to use for TCP warmup.
378 self
.tcp_warmup_remote_file
= (str(self
.bucket_url
) +
379 os
.path
.basename(self
.tcp_warmup_file
))
381 # Local file on disk for write throughput tests.
382 self
.thru_local_file
= _MakeFile(self
.thru_filesize
)
384 # Dummy file buffer to use for downloading that goes nowhere.
385 self
.discard_sink
= DummyFile()
388 """Performs operations to clean things up after performing diagnostics."""
389 for fpath
in self
.latency_files
+ [self
.thru_local_file
,
390 self
.tcp_warmup_file
]:
396 for object_name
in self
.test_object_names
:
400 self
.gsutil_api
.DeleteObject(self
.bucket_url
.bucket_name
,
402 provider
=self
.provider
)
403 except NotFoundException
:
406 self
._RunOperation
(_Delete
)
408 @contextlib.contextmanager
409 def _Time(self
, key
, bucket
):
410 """A context manager that measures time.
412 A context manager that prints a status message before and after executing
413 the inner command and times how long the inner command takes. Keeps track of
414 the timing, aggregated by the given key.
417 key: The key to insert the timing value into a dictionary bucket.
418 bucket: A dictionary to place the timing value in.
421 For the context manager.
423 self
.logger
.info('%s starting...', key
)
427 bucket
[key
].append(t1
- t0
)
428 self
.logger
.info('%s done.', key
)
430 def _RunOperation(self
, func
):
431 """Runs an operation with retry logic.
434 func: The function to run.
437 True if the operation succeeds, False if aborted.
439 # We retry on httplib exceptions that can happen if the socket was closed
440 # by the remote party or the connection broke because of network issues.
441 # Only the BotoServerError is counted as a 5xx error towards the retry
444 server_error_retried
= 0
449 next_sleep
= min(random
.random() * (2 ** i
) + 1, GetMaxRetryDelay())
452 self
.total_requests
+= 1
454 except tuple(self
.exceptions
) as e
:
456 if total_retried
> self
.MAX_TOTAL_RETRIES
:
457 self
.logger
.info('Reached maximum total retries. Not retrying.')
459 if isinstance(e
, ServiceException
):
461 self
.error_responses_by_code
[e
.status
] += 1
462 self
.total_requests
+= 1
463 self
.request_errors
+= 1
464 server_error_retried
+= 1
465 time
.sleep(next_sleep
)
468 if server_error_retried
> self
.MAX_SERVER_ERROR_RETRIES
:
470 'Reached maximum server error retries. Not retrying.')
473 self
.connection_breaks
+= 1
476 def _RunLatencyTests(self
):
477 """Runs latency tests."""
478 # Stores timing information for each category of operation.
479 self
.results
['latency'] = defaultdict(list)
481 for i
in range(self
.num_iterations
):
482 self
.logger
.info('\nRunning latency iteration %d...', i
+1)
483 for fpath
in self
.latency_files
:
484 url
= self
.bucket_url
.Clone()
485 url
.object_name
= os
.path
.basename(fpath
)
486 file_size
= self
.file_sizes
[fpath
]
487 readable_file_size
= MakeHumanReadable(file_size
)
490 "\nFile of size %s located on disk at '%s' being diagnosed in the "
491 "cloud at '%s'.", readable_file_size
, fpath
, url
)
493 upload_target
= StorageUrlToUploadObjectMetadata(url
)
496 io_fp
= cStringIO
.StringIO(self
.file_contents
[fpath
])
497 with self
._Time
('UPLOAD_%d' % file_size
, self
.results
['latency']):
498 self
.gsutil_api
.UploadObject(
499 io_fp
, upload_target
, size
=file_size
, provider
=self
.provider
,
501 self
._RunOperation
(_Upload
)
504 with self
._Time
('METADATA_%d' % file_size
, self
.results
['latency']):
505 return self
.gsutil_api
.GetObjectMetadata(
506 url
.bucket_name
, url
.object_name
,
507 provider
=self
.provider
, fields
=['name', 'contentType',
508 'mediaLink', 'size'])
509 # Download will get the metadata first if we don't pass it in.
510 download_metadata
= self
._RunOperation
(_Metadata
)
511 serialization_dict
= GetDownloadSerializationDict(download_metadata
)
512 serialization_data
= json
.dumps(serialization_dict
)
515 with self
._Time
('DOWNLOAD_%d' % file_size
, self
.results
['latency']):
516 self
.gsutil_api
.GetObjectMedia(
517 url
.bucket_name
, url
.object_name
, self
.discard_sink
,
518 provider
=self
.provider
, serialization_data
=serialization_data
)
519 self
._RunOperation
(_Download
)
522 with self
._Time
('DELETE_%d' % file_size
, self
.results
['latency']):
523 self
.gsutil_api
.DeleteObject(url
.bucket_name
, url
.object_name
,
524 provider
=self
.provider
)
525 self
._RunOperation
(_Delete
)
527 class _CpFilter(logging
.Filter
):
529 def filter(self
, record
):
530 # Used to prevent cp._LogCopyOperation from spewing output from
531 # subprocesses about every iteration.
532 msg
= record
.getMessage()
533 return not (('Copying file:///' in msg
) or ('Copying gs://' in msg
) or
534 ('Computing CRC' in msg
))
536 def _PerfdiagExceptionHandler(self
, e
):
537 """Simple exception handler to allow post-completion status."""
538 self
.logger
.error(str(e
))
540 def _RunReadThruTests(self
):
541 """Runs read throughput tests."""
543 '\nRunning read throughput tests (%s iterations of size %s)' %
544 (self
.num_iterations
, MakeHumanReadable(self
.thru_filesize
)))
546 self
.results
['read_throughput'] = {'file_size': self
.thru_filesize
,
547 'num_times': self
.num_iterations
,
548 'processes': self
.processes
,
549 'threads': self
.threads
}
551 # Copy the TCP warmup file.
552 warmup_url
= self
.bucket_url
.Clone()
553 warmup_url
.object_name
= os
.path
.basename(self
.tcp_warmup_file
)
554 warmup_target
= StorageUrlToUploadObjectMetadata(warmup_url
)
555 self
.test_object_names
.add(warmup_url
.object_name
)
558 self
.gsutil_api
.UploadObject(
559 cStringIO
.StringIO(self
.file_contents
[self
.tcp_warmup_file
]),
560 warmup_target
, provider
=self
.provider
, fields
=['name'])
561 self
._RunOperation
(_Upload1
)
563 # Copy the file to remote location before reading.
564 thru_url
= self
.bucket_url
.Clone()
565 thru_url
.object_name
= os
.path
.basename(self
.thru_local_file
)
566 thru_target
= StorageUrlToUploadObjectMetadata(thru_url
)
567 thru_target
.md5Hash
= self
.file_md5s
[self
.thru_local_file
]
568 self
.test_object_names
.add(thru_url
.object_name
)
570 # Get the mediaLink here so that we can pass it to download.
572 return self
.gsutil_api
.UploadObject(
573 cStringIO
.StringIO(self
.file_contents
[self
.thru_local_file
]),
574 thru_target
, provider
=self
.provider
, size
=self
.thru_filesize
,
575 fields
=['name', 'mediaLink', 'size'])
577 # Get the metadata for the object so that we are just measuring performance
578 # on the actual bytes transfer.
579 download_metadata
= self
._RunOperation
(_Upload2
)
580 serialization_dict
= GetDownloadSerializationDict(download_metadata
)
581 serialization_data
= json
.dumps(serialization_dict
)
583 if self
.processes
== 1 and self
.threads
== 1:
585 # Warm up the TCP connection.
587 self
.gsutil_api
.GetObjectMedia(warmup_url
.bucket_name
,
588 warmup_url
.object_name
,
590 provider
=self
.provider
)
591 self
._RunOperation
(_Warmup
)
597 self
.gsutil_api
.GetObjectMedia(
598 thru_url
.bucket_name
, thru_url
.object_name
, self
.discard_sink
,
599 provider
=self
.provider
, serialization_data
=serialization_data
)
601 times
.append(t1
- t0
)
602 for _
in range(self
.num_iterations
):
603 self
._RunOperation
(_Download
)
604 time_took
= sum(times
)
606 args
= ([(thru_url
.bucket_name
, thru_url
.object_name
, serialization_data
)]
607 * self
.num_iterations
)
608 self
.logger
.addFilter(self
._CpFilter
())
611 self
.Apply(_DownloadWrapper
,
613 _PerfdiagExceptionHandler
,
614 arg_checker
=DummyArgChecker
,
615 parallel_operations_override
=True,
616 process_count
=self
.processes
,
617 thread_count
=self
.threads
)
621 total_bytes_copied
= self
.thru_filesize
* self
.num_iterations
622 bytes_per_second
= total_bytes_copied
/ time_took
624 self
.results
['read_throughput']['time_took'] = time_took
625 self
.results
['read_throughput']['total_bytes_copied'] = total_bytes_copied
626 self
.results
['read_throughput']['bytes_per_second'] = bytes_per_second
628 def _RunWriteThruTests(self
):
629 """Runs write throughput tests."""
631 '\nRunning write throughput tests (%s iterations of size %s)' %
632 (self
.num_iterations
, MakeHumanReadable(self
.thru_filesize
)))
634 self
.results
['write_throughput'] = {'file_size': self
.thru_filesize
,
635 'num_copies': self
.num_iterations
,
636 'processes': self
.processes
,
637 'threads': self
.threads
}
639 warmup_url
= self
.bucket_url
.Clone()
640 warmup_url
.object_name
= os
.path
.basename(self
.tcp_warmup_file
)
641 warmup_target
= StorageUrlToUploadObjectMetadata(warmup_url
)
642 self
.test_object_names
.add(warmup_url
.object_name
)
644 thru_url
= self
.bucket_url
.Clone()
645 thru_url
.object_name
= os
.path
.basename(self
.thru_local_file
)
646 thru_target
= StorageUrlToUploadObjectMetadata(thru_url
)
648 for i
in xrange(self
.num_iterations
):
649 # Create a unique name for each uploaded object. Otherwise,
650 # the XML API would fail when trying to non-atomically get metadata
651 # for the object that gets blown away by the overwrite.
652 remote_object_name
= thru_target
.name
+ str(i
)
653 self
.test_object_names
.add(remote_object_name
)
654 thru_tuples
.append(UploadObjectTuple(thru_target
.bucket
,
656 filepath
=self
.thru_local_file
))
658 if self
.processes
== 1 and self
.threads
== 1:
659 # Warm up the TCP connection.
661 self
.gsutil_api
.UploadObject(
662 cStringIO
.StringIO(self
.file_contents
[self
.tcp_warmup_file
]),
663 warmup_target
, provider
=self
.provider
, size
=self
.thru_filesize
,
665 self
._RunOperation
(_Warmup
)
669 for i
in xrange(self
.num_iterations
):
670 thru_tuple
= thru_tuples
[i
]
672 """Uploads the write throughput measurement object."""
673 upload_target
= apitools_messages
.Object(
674 bucket
=thru_tuple
.bucket_name
, name
=thru_tuple
.object_name
,
675 md5Hash
=thru_tuple
.md5
)
676 io_fp
= cStringIO
.StringIO(self
.file_contents
[self
.thru_local_file
])
678 if self
.thru_filesize
< ResumableThreshold():
679 self
.gsutil_api
.UploadObject(
680 io_fp
, upload_target
, provider
=self
.provider
,
681 size
=self
.thru_filesize
, fields
=['name'])
683 self
.gsutil_api
.UploadObjectResumable(
684 io_fp
, upload_target
, provider
=self
.provider
,
685 size
=self
.thru_filesize
, fields
=['name'],
686 tracker_callback
=_DummyTrackerCallback
)
689 times
.append(t1
- t0
)
691 self
._RunOperation
(_Upload
)
692 time_took
= sum(times
)
697 self
.Apply(_UploadWrapper
,
699 _PerfdiagExceptionHandler
,
700 arg_checker
=DummyArgChecker
,
701 parallel_operations_override
=True,
702 process_count
=self
.processes
,
703 thread_count
=self
.threads
)
707 total_bytes_copied
= self
.thru_filesize
* self
.num_iterations
708 bytes_per_second
= total_bytes_copied
/ time_took
710 self
.results
['write_throughput']['time_took'] = time_took
711 self
.results
['write_throughput']['total_bytes_copied'] = total_bytes_copied
712 self
.results
['write_throughput']['bytes_per_second'] = bytes_per_second
714 def _RunListTests(self
):
715 """Runs eventual consistency listing latency tests."""
716 self
.results
['listing'] = {'num_files': self
.num_iterations
}
718 # Generate N random object names to put in the bucket.
719 list_prefix
= 'gsutil-perfdiag-list-'
721 for _
in xrange(self
.num_iterations
):
722 list_object_name
= u
'%s%s' % (list_prefix
, os
.urandom(20).encode('hex'))
723 self
.test_object_names
.add(list_object_name
)
724 list_objects
.append(list_object_name
)
726 # Add the objects to the bucket.
728 '\nWriting %s objects for listing test...', self
.num_iterations
)
729 empty_md5
= CalculateB64EncodedMd5FromContents(cStringIO
.StringIO(''))
731 UploadObjectTuple(self
.bucket_url
.bucket_name
, name
, md5
=empty_md5
,
732 contents
='') for name
in list_objects
]
733 self
.Apply(_UploadWrapper
, args
, _PerfdiagExceptionHandler
,
734 arg_checker
=DummyArgChecker
)
738 total_start_time
= time
.time()
739 expected_objects
= set(list_objects
)
740 found_objects
= set()
743 """Lists and returns objects in the bucket. Also records latency."""
745 objects
= list(self
.gsutil_api
.ListObjects(
746 self
.bucket_url
.bucket_name
, prefix
=list_prefix
, delimiter
='/',
747 provider
=self
.provider
, fields
=['items/name']))
749 list_latencies
.append(t1
- t0
)
750 return set([obj
.data
.name
for obj
in objects
])
753 'Listing bucket %s waiting for %s objects to appear...',
754 self
.bucket_url
.bucket_name
, self
.num_iterations
)
755 while expected_objects
- found_objects
:
756 def _ListAfterUpload():
758 found_objects
.update(names
& expected_objects
)
759 files_seen
.append(len(found_objects
))
760 self
._RunOperation
(_ListAfterUpload
)
761 if expected_objects
- found_objects
:
762 if time
.time() - total_start_time
> self
.MAX_LISTING_WAIT_TIME
:
763 self
.logger
.warning('Maximum time reached waiting for listing.')
765 total_end_time
= time
.time()
767 self
.results
['listing']['insert'] = {
768 'num_listing_calls': len(list_latencies
),
769 'list_latencies': list_latencies
,
770 'files_seen_after_listing': files_seen
,
771 'time_took': total_end_time
- total_start_time
,
775 'Deleting %s objects for listing test...', self
.num_iterations
)
776 self
.Apply(_DeleteWrapper
, args
, _PerfdiagExceptionHandler
,
777 arg_checker
=DummyArgChecker
)
780 'Listing bucket %s waiting for %s objects to disappear...',
781 self
.bucket_url
.bucket_name
, self
.num_iterations
)
784 total_start_time
= time
.time()
785 found_objects
= set(list_objects
)
787 def _ListAfterDelete():
789 found_objects
.intersection_update(names
)
790 files_seen
.append(len(found_objects
))
791 self
._RunOperation
(_ListAfterDelete
)
793 if time
.time() - total_start_time
> self
.MAX_LISTING_WAIT_TIME
:
794 self
.logger
.warning('Maximum time reached waiting for listing.')
796 total_end_time
= time
.time()
798 self
.results
['listing']['delete'] = {
799 'num_listing_calls': len(list_latencies
),
800 'list_latencies': list_latencies
,
801 'files_seen_after_listing': files_seen
,
802 'time_took': total_end_time
- total_start_time
,
805 def Upload(self
, thru_tuple
, thread_state
=None):
806 gsutil_api
= GetCloudApiInstance(self
, thread_state
)
808 md5hash
= thru_tuple
.md5
809 contents
= thru_tuple
.contents
810 if thru_tuple
.filepath
:
811 md5hash
= self
.file_md5s
[thru_tuple
.filepath
]
812 contents
= self
.file_contents
[thru_tuple
.filepath
]
814 upload_target
= apitools_messages
.Object(
815 bucket
=thru_tuple
.bucket_name
, name
=thru_tuple
.object_name
,
817 file_size
= len(contents
)
818 if file_size
< ResumableThreshold():
819 gsutil_api
.UploadObject(
820 cStringIO
.StringIO(contents
), upload_target
,
821 provider
=self
.provider
, size
=file_size
, fields
=['name'])
823 gsutil_api
.UploadObjectResumable(
824 cStringIO
.StringIO(contents
), upload_target
,
825 provider
=self
.provider
, size
=file_size
, fields
=['name'],
826 tracker_callback
=_DummyTrackerCallback
)
828 def Download(self
, download_tuple
, thread_state
=None):
832 download_tuple: (bucket name, object name, serialization data for object).
833 thread_state: gsutil Cloud API instance to use for the download.
835 gsutil_api
= GetCloudApiInstance(self
, thread_state
)
836 gsutil_api
.GetObjectMedia(
837 download_tuple
[0], download_tuple
[1], self
.discard_sink
,
838 provider
=self
.provider
, serialization_data
=download_tuple
[2])
840 def Delete(self
, thru_tuple
, thread_state
=None):
841 gsutil_api
= thread_state
or self
.gsutil_api
842 gsutil_api
.DeleteObject(
843 thru_tuple
.bucket_name
, thru_tuple
.object_name
, provider
=self
.provider
)
845 def _GetDiskCounters(self
):
846 """Retrieves disk I/O statistics for all disks.
848 Adapted from the psutil module's psutil._pslinux.disk_io_counters:
849 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py
851 Originally distributed under under a BSD license.
852 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.
855 A dictionary containing disk names mapped to the disk counters from
858 # iostat documentation states that sectors are equivalent with blocks and
859 # have a size of 512 bytes since 2.4 kernels. This value is needed to
860 # calculate the amount of disk I/O in bytes.
864 with
open('/proc/partitions', 'r') as f
:
865 lines
= f
.readlines()[2:]
867 _
, _
, _
, name
= line
.split()
868 if name
[-1].isdigit():
869 partitions
.append(name
)
872 with
open('/proc/diskstats', 'r') as f
:
874 values
= line
.split()[:11]
875 _
, _
, name
, reads
, _
, rbytes
, rtime
, writes
, _
, wbytes
, wtime
= values
876 if name
in partitions
:
877 rbytes
= int(rbytes
) * sector_size
878 wbytes
= int(wbytes
) * sector_size
883 retdict
[name
] = (reads
, writes
, rbytes
, wbytes
, rtime
, wtime
)
886 def _GetTcpStats(self
):
887 """Tries to parse out TCP packet information from netstat output.
890 A dictionary containing TCP information, or None if netstat is not
893 # netstat return code is non-zero for -s on Linux, so don't raise on error.
895 netstat_output
= self
._Exec
(['netstat', '-s'], return_output
=True,
896 raise_on_error
=False)
898 self
.logger
.warning('netstat not found on your system; some measurement '
899 'data will be missing')
901 netstat_output
= netstat_output
.strip().lower()
903 tcp_retransmit
= None
906 for line
in netstat_output
.split('\n'):
907 # Header for TCP section is "Tcp:" in Linux/Mac and
908 # "TCP Statistics for" in Windows.
909 if 'tcp:' in line
or 'tcp statistics' in line
:
912 # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts"
913 # Windows == "segments retransmitted".
914 if (found_tcp
and tcp_retransmit
is None and
915 ('segments retransmited' in line
or 'retransmit timeouts' in line
or
916 'segments retransmitted' in line
)):
917 tcp_retransmit
= ''.join(c
for c
in line
if c
in string
.digits
)
919 # Linux+Windows == "segments received", Mac == "packets received".
920 if (found_tcp
and tcp_received
is None and
921 ('segments received' in line
or 'packets received' in line
)):
922 tcp_received
= ''.join(c
for c
in line
if c
in string
.digits
)
924 # Linux == "segments send out" (sic), Mac+Windows == "packets sent".
925 if (found_tcp
and tcp_sent
is None and
926 ('segments send out' in line
or 'packets sent' in line
or
927 'segments sent' in line
)):
928 tcp_sent
= ''.join(c
for c
in line
if c
in string
.digits
)
932 result
['tcp_retransmit'] = int(tcp_retransmit
)
933 result
['tcp_received'] = int(tcp_received
)
934 result
['tcp_sent'] = int(tcp_sent
)
935 except (ValueError, TypeError):
936 result
['tcp_retransmit'] = None
937 result
['tcp_received'] = None
938 result
['tcp_sent'] = None
942 def _CollectSysInfo(self
):
943 """Collects system information."""
946 # All exceptions that might be raised from socket module calls.
948 socket
.error
, socket
.herror
, socket
.gaierror
, socket
.timeout
)
950 # Find out whether HTTPS is enabled in Boto.
951 sysinfo
['boto_https_enabled'] = boto
.config
.get('Boto', 'is_secure', True)
953 # Look up proxy info.
954 proxy_host
= boto
.config
.get('Boto', 'proxy', None)
955 proxy_port
= boto
.config
.getint('Boto', 'proxy_port', 0)
956 sysinfo
['using_proxy'] = bool(proxy_host
)
958 if boto
.config
.get('Boto', 'proxy_rdns', False):
959 self
.logger
.info('DNS lookups are disallowed in this environment, so '
960 'some information is not included in this perfdiag run.')
962 # Get the local IP address from socket lib.
964 sysinfo
['ip_address'] = socket
.gethostbyname(socket
.gethostname())
965 except socket_errors
:
966 sysinfo
['ip_address'] = ''
967 # Record the temporary directory used since it can affect performance, e.g.
968 # when on a networked filesystem.
969 sysinfo
['tempdir'] = tempfile
.gettempdir()
971 # Produces an RFC 2822 compliant GMT timestamp.
972 sysinfo
['gmt_timestamp'] = time
.strftime('%a, %d %b %Y %H:%M:%S +0000',
975 # Execute a CNAME lookup on Google DNS to find what Google server
977 cmd
= ['nslookup', '-type=CNAME', self
.XML_API_HOST
]
979 nslookup_cname_output
= self
._Exec
(cmd
, return_output
=True)
980 m
= re
.search(r
' = (?P<googserv>[^.]+)\.', nslookup_cname_output
)
981 sysinfo
['googserv_route'] = m
.group('googserv') if m
else None
982 except (CommandException
, OSError):
983 sysinfo
['googserv_route'] = ''
985 # Try to determine the latency of a DNS lookup for the Google hostname
986 # endpoint. Note: we don't piggyback on gethostbyname_ex below because
987 # the _ex version requires an extra RTT.
990 socket
.gethostbyname(self
.XML_API_HOST
)
992 sysinfo
['google_host_dns_latency'] = t1
- t0
993 except socket_errors
:
996 # Look up IP addresses for Google Server.
998 (hostname
, _
, ipaddrlist
) = socket
.gethostbyname_ex(self
.XML_API_HOST
)
999 sysinfo
['googserv_ips'] = ipaddrlist
1000 except socket_errors
:
1002 sysinfo
['googserv_ips'] = []
1004 # Reverse lookup the hostnames for the Google Server IPs.
1005 sysinfo
['googserv_hostnames'] = []
1006 for googserv_ip
in ipaddrlist
:
1008 (hostname
, _
, ipaddrlist
) = socket
.gethostbyaddr(googserv_ip
)
1009 sysinfo
['googserv_hostnames'].append(hostname
)
1010 except socket_errors
:
1013 # Query o-o to find out what the Google DNS thinks is the user's IP.
1015 cmd
= ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.']
1016 nslookup_txt_output
= self
._Exec
(cmd
, return_output
=True)
1017 m
= re
.search(r
'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output
)
1018 sysinfo
['dns_o-o_ip'] = m
.group('dnsip') if m
else None
1019 except (CommandException
, OSError):
1020 sysinfo
['dns_o-o_ip'] = ''
1022 # Try to determine the latency of connecting to the Google hostname
1024 sysinfo
['google_host_connect_latencies'] = {}
1025 for googserv_ip
in ipaddrlist
:
1027 sock
= socket
.socket()
1029 sock
.connect((googserv_ip
, self
.XML_API_PORT
))
1031 sysinfo
['google_host_connect_latencies'][googserv_ip
] = t1
- t0
1032 except socket_errors
:
1035 # If using a proxy, try to determine the latency of a DNS lookup to resolve
1036 # the proxy hostname and the latency of connecting to the proxy.
1041 proxy_ip
= socket
.gethostbyname(proxy_host
)
1043 sysinfo
['proxy_dns_latency'] = t1
- t0
1044 except socket_errors
:
1048 sock
= socket
.socket()
1050 sock
.connect((proxy_ip
or proxy_host
, proxy_port
))
1052 sysinfo
['proxy_host_connect_latency'] = t1
- t0
1053 except socket_errors
:
1056 # Try and find the number of CPUs in the system if available.
1058 sysinfo
['cpu_count'] = multiprocessing
.cpu_count()
1059 except NotImplementedError:
1060 sysinfo
['cpu_count'] = None
1062 # For *nix platforms, obtain the CPU load.
1064 sysinfo
['load_avg'] = list(os
.getloadavg())
1065 except (AttributeError, OSError):
1066 sysinfo
['load_avg'] = None
1068 # Try and collect memory information from /proc/meminfo if possible.
1075 with
open('/proc/meminfo', 'r') as f
:
1077 if line
.startswith('MemTotal'):
1078 mem_total
= (int(''.join(c
for c
in line
if c
in string
.digits
))
1080 elif line
.startswith('MemFree'):
1081 mem_free
= (int(''.join(c
for c
in line
if c
in string
.digits
))
1083 elif line
.startswith('Buffers'):
1084 mem_buffers
= (int(''.join(c
for c
in line
if c
in string
.digits
))
1086 elif line
.startswith('Cached'):
1087 mem_cached
= (int(''.join(c
for c
in line
if c
in string
.digits
))
1089 except (IOError, ValueError):
1092 sysinfo
['meminfo'] = {'mem_total': mem_total
,
1093 'mem_free': mem_free
,
1094 'mem_buffers': mem_buffers
,
1095 'mem_cached': mem_cached
}
1097 # Get configuration attributes from config module.
1098 sysinfo
['gsutil_config'] = {}
1099 for attr
in dir(config
):
1100 attr_value
= getattr(config
, attr
)
1101 # Filter out multiline strings that are not useful.
1102 if attr
.isupper() and not (isinstance(attr_value
, basestring
) and
1103 '\n' in attr_value
):
1104 sysinfo
['gsutil_config'][attr
] = attr_value
1106 sysinfo
['tcp_proc_values'] = {}
1108 '/proc/sys/net/core/rmem_default',
1109 '/proc/sys/net/core/rmem_max',
1110 '/proc/sys/net/core/wmem_default',
1111 '/proc/sys/net/core/wmem_max',
1112 '/proc/sys/net/ipv4/tcp_timestamps',
1113 '/proc/sys/net/ipv4/tcp_sack',
1114 '/proc/sys/net/ipv4/tcp_window_scaling',
1116 for fname
in stats_to_check
:
1118 with
open(fname
, 'r') as f
:
1120 sysinfo
['tcp_proc_values'][os
.path
.basename(fname
)] = value
.strip()
1124 self
.results
['sysinfo'] = sysinfo
1126 def _DisplayStats(self
, trials
):
1127 """Prints out mean, standard deviation, median, and 90th percentile."""
1129 mean
= float(sum(trials
)) / n
1130 stdev
= math
.sqrt(sum((x
- mean
)**2 for x
in trials
) / n
)
1132 print str(n
).rjust(6), '',
1133 print ('%.1f' % (mean
* 1000)).rjust(9), '',
1134 print ('%.1f' % (stdev
* 1000)).rjust(12), '',
1135 print ('%.1f' % (Percentile(trials
, 0.5) * 1000)).rjust(11), '',
1136 print ('%.1f' % (Percentile(trials
, 0.9) * 1000)).rjust(11), ''
1138 def _DisplayResults(self
):
1139 """Displays results collected from diagnostic run."""
1142 print 'DIAGNOSTIC RESULTS'.center(78)
1145 if 'latency' in self
.results
:
1148 print 'Latency'.center(78)
1150 print ('Operation Size Trials Mean (ms) Std Dev (ms) '
1151 'Median (ms) 90th % (ms)')
1152 print ('========= ========= ====== ========= ============ '
1153 '=========== ===========')
1154 for key
in sorted(self
.results
['latency']):
1155 trials
= sorted(self
.results
['latency'][key
])
1156 op
, numbytes
= key
.split('_')
1157 numbytes
= int(numbytes
)
1158 if op
== 'METADATA':
1159 print 'Metadata'.rjust(9), '',
1160 print MakeHumanReadable(numbytes
).rjust(9), '',
1161 self
._DisplayStats
(trials
)
1162 if op
== 'DOWNLOAD':
1163 print 'Download'.rjust(9), '',
1164 print MakeHumanReadable(numbytes
).rjust(9), '',
1165 self
._DisplayStats
(trials
)
1167 print 'Upload'.rjust(9), '',
1168 print MakeHumanReadable(numbytes
).rjust(9), '',
1169 self
._DisplayStats
(trials
)
1171 print 'Delete'.rjust(9), '',
1172 print MakeHumanReadable(numbytes
).rjust(9), '',
1173 self
._DisplayStats
(trials
)
1175 if 'write_throughput' in self
.results
:
1178 print 'Write Throughput'.center(78)
1180 write_thru
= self
.results
['write_throughput']
1181 print 'Copied a %s file %d times for a total transfer size of %s.' % (
1182 MakeHumanReadable(write_thru
['file_size']),
1183 write_thru
['num_copies'],
1184 MakeHumanReadable(write_thru
['total_bytes_copied']))
1185 print 'Write throughput: %s/s.' % (
1186 MakeBitsHumanReadable(write_thru
['bytes_per_second'] * 8))
1188 if 'read_throughput' in self
.results
:
1191 print 'Read Throughput'.center(78)
1193 read_thru
= self
.results
['read_throughput']
1194 print 'Copied a %s file %d times for a total transfer size of %s.' % (
1195 MakeHumanReadable(read_thru
['file_size']),
1196 read_thru
['num_times'],
1197 MakeHumanReadable(read_thru
['total_bytes_copied']))
1198 print 'Read throughput: %s/s.' % (
1199 MakeBitsHumanReadable(read_thru
['bytes_per_second'] * 8))
1201 if 'listing' in self
.results
:
1204 print 'Listing'.center(78)
1207 listing
= self
.results
['listing']
1208 insert
= listing
['insert']
1209 delete
= listing
['delete']
1210 print 'After inserting %s objects:' % listing
['num_files']
1211 print (' Total time for objects to appear: %.2g seconds' %
1212 insert
['time_took'])
1213 print ' Number of listing calls made: %s' % insert
['num_listing_calls']
1214 print (' Individual listing call latencies: [%s]' %
1215 ', '.join('%.2gs' % lat
for lat
in insert
['list_latencies']))
1216 print (' Files reflected after each call: [%s]' %
1217 ', '.join(map(str, insert
['files_seen_after_listing'])))
1219 print 'After deleting %s objects:' % listing
['num_files']
1220 print (' Total time for objects to appear: %.2g seconds' %
1221 delete
['time_took'])
1222 print ' Number of listing calls made: %s' % delete
['num_listing_calls']
1223 print (' Individual listing call latencies: [%s]' %
1224 ', '.join('%.2gs' % lat
for lat
in delete
['list_latencies']))
1225 print (' Files reflected after each call: [%s]' %
1226 ', '.join(map(str, delete
['files_seen_after_listing'])))
1228 if 'sysinfo' in self
.results
:
1231 print 'System Information'.center(78)
1233 info
= self
.results
['sysinfo']
1234 print 'IP Address: \n %s' % info
['ip_address']
1235 print 'Temporary Directory: \n %s' % info
['tempdir']
1236 print 'Bucket URI: \n %s' % self
.results
['bucket_uri']
1237 print 'gsutil Version: \n %s' % self
.results
.get('gsutil_version',
1239 print 'boto Version: \n %s' % self
.results
.get('boto_version', 'Unknown')
1241 if 'gmt_timestamp' in info
:
1242 ts_string
= info
['gmt_timestamp']
1245 # Convert RFC 2822 string to Linux timestamp.
1246 timetuple
= time
.strptime(ts_string
, '%a, %d %b %Y %H:%M:%S +0000')
1251 # Converts the GMT time tuple to local Linux timestamp.
1252 localtime
= calendar
.timegm(timetuple
)
1253 localdt
= datetime
.datetime
.fromtimestamp(localtime
)
1254 print 'Measurement time: \n %s' % localdt
.strftime(
1255 '%Y-%m-%d %I:%M:%S %p %Z')
1257 print 'Google Server: \n %s' % info
['googserv_route']
1258 print ('Google Server IP Addresses: \n %s' %
1259 ('\n '.join(info
['googserv_ips'])))
1260 print ('Google Server Hostnames: \n %s' %
1261 ('\n '.join(info
['googserv_hostnames'])))
1262 print 'Google DNS thinks your IP is: \n %s' % info
['dns_o-o_ip']
1263 print 'CPU Count: \n %s' % info
['cpu_count']
1264 print 'CPU Load Average: \n %s' % info
['load_avg']
1266 print ('Total Memory: \n %s' %
1267 MakeHumanReadable(info
['meminfo']['mem_total']))
1268 # Free memory is really MemFree + Buffers + Cached.
1269 print 'Free Memory: \n %s' % MakeHumanReadable(
1270 info
['meminfo']['mem_free'] +
1271 info
['meminfo']['mem_buffers'] +
1272 info
['meminfo']['mem_cached'])
1276 if 'netstat_end' in info
and 'netstat_start' in info
:
1277 netstat_after
= info
['netstat_end']
1278 netstat_before
= info
['netstat_start']
1279 for tcp_type
in ('sent', 'received', 'retransmit'):
1281 delta
= (netstat_after
['tcp_%s' % tcp_type
] -
1282 netstat_before
['tcp_%s' % tcp_type
])
1283 print 'TCP segments %s during test:\n %d' % (tcp_type
, delta
)
1287 print ('TCP segment counts not available because "netstat" was not '
1288 'found during test runs')
1290 if 'disk_counters_end' in info
and 'disk_counters_start' in info
:
1291 print 'Disk Counter Deltas:\n',
1292 disk_after
= info
['disk_counters_end']
1293 disk_before
= info
['disk_counters_start']
1294 print '', 'disk'.rjust(6),
1295 for colname
in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime',
1297 print colname
.rjust(8),
1299 for diskname
in sorted(disk_after
):
1300 before
= disk_before
[diskname
]
1301 after
= disk_after
[diskname
]
1302 (reads1
, writes1
, rbytes1
, wbytes1
, rtime1
, wtime1
) = before
1303 (reads2
, writes2
, rbytes2
, wbytes2
, rtime2
, wtime2
) = after
1304 print '', diskname
.rjust(6),
1305 deltas
= [reads2
-reads1
, writes2
-writes1
, rbytes2
-rbytes1
,
1306 wbytes2
-wbytes1
, rtime2
-rtime1
, wtime2
-wtime1
]
1307 for delta
in deltas
:
1308 print str(delta
).rjust(8),
1311 if 'tcp_proc_values' in info
:
1312 print 'TCP /proc values:\n',
1313 for item
in info
['tcp_proc_values'].iteritems():
1314 print ' %s = %s' % item
1316 if 'boto_https_enabled' in info
:
1317 print 'Boto HTTPS Enabled: \n %s' % info
['boto_https_enabled']
1319 if 'using_proxy' in info
:
1320 print 'Requests routed through proxy: \n %s' % info
['using_proxy']
1322 if 'google_host_dns_latency' in info
:
1323 print ('Latency of the DNS lookup for Google Storage server (ms): '
1324 '\n %.1f' % (info
['google_host_dns_latency'] * 1000.0))
1326 if 'google_host_connect_latencies' in info
:
1327 print 'Latencies connecting to Google Storage server IPs (ms):'
1328 for ip
, latency
in info
['google_host_connect_latencies'].iteritems():
1329 print ' %s = %.1f' % (ip
, latency
* 1000.0)
1331 if 'proxy_dns_latency' in info
:
1332 print ('Latency of the DNS lookup for the configured proxy (ms): '
1333 '\n %.1f' % (info
['proxy_dns_latency'] * 1000.0))
1335 if 'proxy_host_connect_latency' in info
:
1336 print ('Latency connecting to the configured proxy (ms): \n %.1f' %
1337 (info
['proxy_host_connect_latency'] * 1000.0))
1339 if 'request_errors' in self
.results
and 'total_requests' in self
.results
:
1342 print 'In-Process HTTP Statistics'.center(78)
1344 total
= int(self
.results
['total_requests'])
1345 numerrors
= int(self
.results
['request_errors'])
1346 numbreaks
= int(self
.results
['connection_breaks'])
1347 availability
= (((total
- numerrors
) / float(total
)) * 100
1348 if total
> 0 else 100)
1349 print 'Total HTTP requests made: %d' % total
1350 print 'HTTP 5xx errors: %d' % numerrors
1351 print 'HTTP connections broken: %d' % numbreaks
1352 print 'Availability: %.7g%%' % availability
1353 if 'error_responses_by_code' in self
.results
:
1354 sorted_codes
= sorted(
1355 self
.results
['error_responses_by_code'].iteritems())
1357 print 'Error responses by code:'
1358 print '\n'.join(' %s: %s' % c
for c
in sorted_codes
)
1360 if self
.output_file
:
1361 with
open(self
.output_file
, 'w') as f
:
1362 json
.dump(self
.results
, f
, indent
=2)
1364 print "Output file written to '%s'." % self
.output_file
1368 def _ParsePositiveInteger(self
, val
, msg
):
1369 """Tries to convert val argument to a positive integer.
1372 val: The value (as a string) to convert to a positive integer.
1373 msg: The error message to place in the CommandException on an error.
1376 A valid positive integer.
1379 CommandException: If the supplied value is not a valid positive integer.
1384 raise CommandException(msg
)
1387 raise CommandException(msg
)
1389 def _ParseArgs(self
):
1390 """Parses arguments for perfdiag command."""
1392 self
.num_iterations
= 5
1398 self
.thru_filesize
= 1048576
1400 self
.diag_tests
= self
.DEFAULT_DIAG_TESTS
1402 self
.output_file
= None
1404 self
.input_file
= None
1406 self
.metadata_keys
= {}
1409 for o
, a
in self
.sub_opts
:
1411 self
.num_iterations
= self
._ParsePositiveInteger
(
1412 a
, 'The -n parameter must be a positive integer.')
1414 self
.processes
= self
._ParsePositiveInteger
(
1415 a
, 'The -c parameter must be a positive integer.')
1417 self
.threads
= self
._ParsePositiveInteger
(
1418 a
, 'The -k parameter must be a positive integer.')
1421 self
.thru_filesize
= HumanReadableToBytes(a
)
1423 raise CommandException('Invalid -s parameter.')
1424 if self
.thru_filesize
> (20 * 1024 ** 3): # Max 20 GiB.
1425 raise CommandException(
1426 'Maximum throughput file size parameter (-s) is 20 GiB.')
1428 self
.diag_tests
= []
1429 for test_name
in a
.strip().split(','):
1430 if test_name
.lower() not in self
.ALL_DIAG_TESTS
:
1431 raise CommandException("List of test names (-t) contains invalid "
1432 "test name '%s'." % test_name
)
1433 self
.diag_tests
.append(test_name
)
1435 pieces
= a
.split(':')
1436 if len(pieces
) != 2:
1437 raise CommandException(
1438 "Invalid metadata key-value combination '%s'." % a
)
1440 self
.metadata_keys
[key
] = value
1442 self
.output_file
= os
.path
.abspath(a
)
1444 self
.input_file
= os
.path
.abspath(a
)
1445 if not os
.path
.isfile(self
.input_file
):
1446 raise CommandException("Invalid input file (-i): '%s'." % a
)
1448 with
open(self
.input_file
, 'r') as f
:
1449 self
.results
= json
.load(f
)
1450 self
.logger
.info("Read input file: '%s'.", self
.input_file
)
1452 raise CommandException("Could not decode input file (-i): '%s'." %
1456 self
.RaiseWrongNumberOfArgumentsException()
1458 self
.bucket_url
= StorageUrlFromString(self
.args
[0])
1459 self
.provider
= self
.bucket_url
.scheme
1460 if not (self
.bucket_url
.IsCloudUrl() and self
.bucket_url
.IsBucket()):
1461 raise CommandException('The perfdiag command requires a URL that '
1462 'specifies a bucket.\n"%s" is not '
1463 'valid.' % self
.args
[0])
1464 # Ensure the bucket exists.
1465 self
.gsutil_api
.GetBucket(self
.bucket_url
.bucket_name
,
1466 provider
=self
.bucket_url
.scheme
,
1468 self
.exceptions
= [httplib
.HTTPException
, socket
.error
, socket
.gaierror
,
1469 socket
.timeout
, httplib
.BadStatusLine
,
1472 # Command entry point.
1473 def RunCommand(self
):
1474 """Called by gsutil when the command is being invoked."""
1478 self
._DisplayResults
()
1481 # We turn off retries in the underlying boto library because the
1482 # _RunOperation function handles errors manually so it can count them.
1483 boto
.config
.set('Boto', 'num_retries', '0')
1486 'Number of iterations to run: %d\n'
1487 'Base bucket URI: %s\n'
1488 'Number of processes: %d\n'
1489 'Number of threads: %d\n'
1490 'Throughput file size: %s\n'
1491 'Diagnostics to run: %s',
1492 self
.num_iterations
,
1496 MakeHumanReadable(self
.thru_filesize
),
1497 (', '.join(self
.diag_tests
)))
1502 # Collect generic system info.
1503 self
._CollectSysInfo
()
1504 # Collect netstat info and disk counters before tests (and again later).
1505 netstat_output
= self
._GetTcpStats
()
1507 self
.results
['sysinfo']['netstat_start'] = netstat_output
1509 self
.results
['sysinfo']['disk_counters_start'] = self
._GetDiskCounters
()
1510 # Record bucket URL.
1511 self
.results
['bucket_uri'] = str(self
.bucket_url
)
1512 self
.results
['json_format'] = 'perfdiag'
1513 self
.results
['metadata'] = self
.metadata_keys
1515 if self
.LAT
in self
.diag_tests
:
1516 self
._RunLatencyTests
()
1517 if self
.RTHRU
in self
.diag_tests
:
1518 self
._RunReadThruTests
()
1519 if self
.WTHRU
in self
.diag_tests
:
1520 self
._RunWriteThruTests
()
1521 if self
.LIST
in self
.diag_tests
:
1522 self
._RunListTests
()
1524 # Collect netstat info and disk counters after tests.
1525 netstat_output
= self
._GetTcpStats
()
1527 self
.results
['sysinfo']['netstat_end'] = netstat_output
1529 self
.results
['sysinfo']['disk_counters_end'] = self
._GetDiskCounters
()
1531 self
.results
['total_requests'] = self
.total_requests
1532 self
.results
['request_errors'] = self
.request_errors
1533 self
.results
['error_responses_by_code'] = self
.error_responses_by_code
1534 self
.results
['connection_breaks'] = self
.connection_breaks
1535 self
.results
['gsutil_version'] = gslib
.VERSION
1536 self
.results
['boto_version'] = boto
.__version
__
1538 self
._DisplayResults
()
1540 # TODO: Install signal handlers so this is performed in response to a
1541 # terminating signal; consider multi-threaded object deletes during
1542 # cleanup so it happens quickly.
1548 class UploadObjectTuple(object):
1549 """Picklable tuple with necessary metadata for an insert object call."""
1551 def __init__(self
, bucket_name
, object_name
, filepath
=None, md5
=None,
1553 """Create an upload tuple.
1556 bucket_name: Name of the bucket to upload to.
1557 object_name: Name of the object to upload to.
1558 filepath: A file path located in self.file_contents and self.file_md5s.
1559 md5: The MD5 hash of the object being uploaded.
1560 contents: The contents of the file to be uploaded.
1562 Note: (contents + md5) and filepath are mutually exlusive. You may specify
1563 one or the other, but not both.
1564 Note: If one of contents or md5 are specified, they must both be specified.
1567 InvalidArgument: if the arguments are invalid.
1569 self
.bucket_name
= bucket_name
1570 self
.object_name
= object_name
1571 self
.filepath
= filepath
1573 self
.contents
= contents
1574 if filepath
and (md5
or contents
is not None):
1575 raise InvalidArgument(
1576 'Only one of filepath or (md5 + contents) may be specified.')
1577 if not filepath
and (not md5
or contents
is None):
1578 raise InvalidArgument(
1579 'Both md5 and contents must be specified.')
1582 def StorageUrlToUploadObjectMetadata(storage_url
):
1583 if storage_url
.IsCloudUrl() and storage_url
.IsObject():
1584 upload_target
= apitools_messages
.Object()
1585 upload_target
.name
= storage_url
.object_name
1586 upload_target
.bucket
= storage_url
.bucket_name
1587 return upload_target
1589 raise CommandException('Non-cloud URL upload target %s was created in '
1590 'perfdiag implemenation.' % storage_url
)