Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / commands / perfdiag.py
blobd88eae78c1242e823b9be169e8f2f9f103a15a23
1 # -*- coding: utf-8 -*-
2 # Copyright 2012 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Contains the perfdiag gsutil command."""
17 from __future__ import absolute_import
19 import calendar
20 from collections import defaultdict
21 import contextlib
22 import cStringIO
23 import datetime
24 import httplib
25 import json
26 import logging
27 import math
28 import multiprocessing
29 import os
30 import random
31 import re
32 import socket
33 import string
34 import subprocess
35 import tempfile
36 import time
38 import boto
39 import boto.gs.connection
41 import gslib
42 from gslib.cloud_api import NotFoundException
43 from gslib.cloud_api import ServiceException
44 from gslib.cloud_api_helper import GetDownloadSerializationDict
45 from gslib.command import Command
46 from gslib.command import DummyArgChecker
47 from gslib.command_argument import CommandArgument
48 from gslib.commands import config
49 from gslib.cs_api_map import ApiSelector
50 from gslib.exception import CommandException
51 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
52 from gslib.storage_url import StorageUrlFromString
53 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
54 from gslib.util import GetCloudApiInstance
55 from gslib.util import GetMaxRetryDelay
56 from gslib.util import HumanReadableToBytes
57 from gslib.util import IS_LINUX
58 from gslib.util import MakeBitsHumanReadable
59 from gslib.util import MakeHumanReadable
60 from gslib.util import Percentile
61 from gslib.util import ResumableThreshold
64 _SYNOPSIS = """
65 gsutil perfdiag [-i in.json]
66 gsutil perfdiag [-o out.json] [-n iterations] [-c processes]
67 [-k threads] [-s size] [-t tests] url...
68 """
70 _DETAILED_HELP_TEXT = ("""
71 <B>SYNOPSIS</B>
72 """ + _SYNOPSIS + """
75 <B>DESCRIPTION</B>
76 The perfdiag command runs a suite of diagnostic tests for a given Google
77 Storage bucket.
79 The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
80 the user has write permission. Several test files will be uploaded to and
81 downloaded from this bucket. All test files will be deleted at the completion
82 of the diagnostic if it finishes successfully.
84 gsutil performance can be impacted by many factors at the client, server,
85 and in-between, such as: CPU speed; available memory; the access path to the
86 local disk; network bandwidth; contention and error rates along the path
87 between gsutil and Google; operating system buffering configuration; and
88 firewalls and other network elements. The perfdiag command is provided so
89 that customers can run a known measurement suite when troubleshooting
90 performance problems.
93 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B>
94 If the Google Cloud Storage Team asks you to run a performance diagnostic
95 please use the following command, and email the output file (output.json)
96 to gs-team@google.com:
98 gsutil perfdiag -o output.json gs://your-bucket
101 <B>OPTIONS</B>
102 -n Sets the number of iterations performed when downloading and
103 uploading files during latency and throughput tests. Defaults to
106 -c Sets the number of processes to use while running throughput
107 experiments. The default value is 1.
109 -k Sets the number of threads per process to use while running
110 throughput experiments. Each process will receive an equal number
111 of threads. The default value is 1.
113 -s Sets the size (in bytes) of the test file used to perform read
114 and write throughput tests. The default is 1 MiB. This can also
115 be specified using byte suffixes such as 500K or 1M. Note: these
116 values are interpreted as multiples of 1024 (K=1024, M=1024*1024,
117 etc.)
119 -t Sets the list of diagnostic tests to perform. The default is to
120 run all diagnostic tests. Must be a comma-separated list
121 containing one or more of the following:
124 Runs N iterations (set with -n) of writing the file,
125 retrieving its metadata, reading the file, and deleting
126 the file. Records the latency of each operation.
128 list
129 Write N (set with -n) objects to the bucket, record how long
130 it takes for the eventually consistent listing call to return
131 the N objects in its result, delete the N objects, then record
132 how long it takes listing to stop returning the N objects.
133 This test is off by default.
135 rthru
136 Runs N (set with -n) read operations, with at most C
137 (set with -c) reads outstanding at any given time.
139 wthru
140 Runs N (set with -n) write operations, with at most C
141 (set with -c) writes outstanding at any given time.
143 -m Adds metadata to the result JSON file. Multiple -m values can be
144 specified. Example:
146 gsutil perfdiag -m "key1:value1" -m "key2:value2" \
147 gs://bucketname/
149 Each metadata key will be added to the top-level "metadata"
150 dictionary in the output JSON file.
152 -o Writes the results of the diagnostic to an output file. The output
153 is a JSON file containing system information and performance
154 diagnostic results. The file can be read and reported later using
155 the -i option.
157 -i Reads the JSON output file created using the -o command and prints
158 a formatted description of the results.
161 <B>MEASURING AVAILABILITY</B>
162 The perfdiag command ignores the boto num_retries configuration parameter.
163 Instead, it always retries on HTTP errors in the 500 range and keeps track of
164 how many 500 errors were encountered during the test. The availability
165 measurement is reported at the end of the test.
167 Note that HTTP responses are only recorded when the request was made in a
168 single process. When using multiple processes or threads, read and write
169 throughput measurements are performed in an external process, so the
170 availability numbers reported won't include the throughput measurements.
173 <B>NOTE</B>
174 The perfdiag command collects system information. It collects your IP address,
175 executes DNS queries to Google servers and collects the results, and collects
176 network statistics information from the output of netstat -s. It will also
177 attempt to connect to your proxy server if you have one configured. None of
178 this information will be sent to Google unless you choose to send it.
179 """)
182 class Error(Exception):
183 """Base exception class for this module."""
184 pass
187 class InvalidArgument(Error):
188 """Raised on invalid arguments to functions."""
189 pass
192 def _DownloadWrapper(cls, arg, thread_state=None):
193 cls.Download(arg, thread_state=thread_state)
196 def _UploadWrapper(cls, arg, thread_state=None):
197 cls.Upload(arg, thread_state=thread_state)
200 def _DeleteWrapper(cls, arg, thread_state=None):
201 cls.Delete(arg, thread_state=thread_state)
204 def _PerfdiagExceptionHandler(cls, e):
205 """Simple exception handler to allow post-completion status."""
206 cls.logger.error(str(e))
209 def _DummyTrackerCallback(_):
210 pass
213 class DummyFile(object):
214 """A dummy, file-like object that throws away everything written to it."""
216 def write(self, *args, **kwargs): # pylint: disable=invalid-name
217 pass
220 # Many functions in perfdiag re-define a temporary function based on a
221 # variable from a loop, resulting in a false positive from the linter.
222 # pylint: disable=cell-var-from-loop
223 class PerfDiagCommand(Command):
224 """Implementation of gsutil perfdiag command."""
226 # Command specification. See base class for documentation.
227 command_spec = Command.CreateCommandSpec(
228 'perfdiag',
229 command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
230 usage_synopsis=_SYNOPSIS,
231 min_args=0,
232 max_args=1,
233 supported_sub_args='n:c:k:s:t:m:i:o:',
234 file_url_ok=False,
235 provider_url_ok=False,
236 urls_start_arg=0,
237 gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
238 gs_default_api=ApiSelector.JSON,
239 argparse_arguments=[
240 CommandArgument.MakeNCloudBucketURLsArgument(1)
243 # Help specification. See help_provider.py for documentation.
244 help_spec = Command.HelpSpec(
245 help_name='perfdiag',
246 help_name_aliases=[],
247 help_type='command_help',
248 help_one_line_summary='Run performance diagnostic',
249 help_text=_DETAILED_HELP_TEXT,
250 subcommand_help_text={},
253 # Byte sizes to use for latency testing files.
254 # TODO: Consider letting the user specify these sizes with a configuration
255 # parameter.
256 test_file_sizes = (
257 0, # 0 bytes
258 1024, # 1 KiB
259 102400, # 100 KiB
260 1048576, # 1 MiB
263 # Test names.
264 RTHRU = 'rthru'
265 WTHRU = 'wthru'
266 LAT = 'lat'
267 LIST = 'list'
269 # List of all diagnostic tests.
270 ALL_DIAG_TESTS = (RTHRU, WTHRU, LAT, LIST)
271 # List of diagnostic tests to run by default.
272 DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
274 # Google Cloud Storage XML API endpoint host.
275 XML_API_HOST = boto.config.get(
276 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
277 # Google Cloud Storage XML API endpoint port.
278 XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
280 # Maximum number of times to retry requests on 5xx errors.
281 MAX_SERVER_ERROR_RETRIES = 5
282 # Maximum number of times to retry requests on more serious errors like
283 # the socket breaking.
284 MAX_TOTAL_RETRIES = 10
286 # The default buffer size in boto's Key object is set to 8 KiB. This becomes a
287 # bottleneck at high throughput rates, so we increase it.
288 KEY_BUFFER_SIZE = 16384
290 # The maximum number of bytes to generate pseudo-randomly before beginning
291 # to repeat bytes. This number was chosen as the next prime larger than 5 MiB.
292 MAX_UNIQUE_RANDOM_BYTES = 5242883
294 # Maximum amount of time, in seconds, we will wait for object listings to
295 # reflect what we expect in the listing tests.
296 MAX_LISTING_WAIT_TIME = 60.0
298 def _Exec(self, cmd, raise_on_error=True, return_output=False,
299 mute_stderr=False):
300 """Executes a command in a subprocess.
302 Args:
303 cmd: List containing the command to execute.
304 raise_on_error: Whether or not to raise an exception when a process exits
305 with a non-zero return code.
306 return_output: If set to True, the return value of the function is the
307 stdout of the process.
308 mute_stderr: If set to True, the stderr of the process is not printed to
309 the console.
311 Returns:
312 The return code of the process or the stdout if return_output is set.
314 Raises:
315 Exception: If raise_on_error is set to True and any process exits with a
316 non-zero return code.
318 self.logger.debug('Running command: %s', cmd)
319 stderr = subprocess.PIPE if mute_stderr else None
320 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
321 (stdoutdata, _) = p.communicate()
322 if raise_on_error and p.returncode:
323 raise CommandException("Received non-zero return code (%d) from "
324 "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
325 return stdoutdata if return_output else p.returncode
327 def _SetUp(self):
328 """Performs setup operations needed before diagnostics can be run."""
330 # Stores test result data.
331 self.results = {}
332 # List of test files in a temporary location on disk for latency ops.
333 self.latency_files = []
334 # List of test objects to clean up in the test bucket.
335 self.test_object_names = set()
336 # Maps each test file path to its size in bytes.
337 self.file_sizes = {}
338 # Maps each test file to its contents as a string.
339 self.file_contents = {}
340 # Maps each test file to its MD5 hash.
341 self.file_md5s = {}
342 # Total number of HTTP requests made.
343 self.total_requests = 0
344 # Total number of HTTP 5xx errors.
345 self.request_errors = 0
346 # Number of responses, keyed by response code.
347 self.error_responses_by_code = defaultdict(int)
348 # Total number of socket errors.
349 self.connection_breaks = 0
351 def _MakeFile(file_size):
352 """Creates a temporary file of the given size and returns its path."""
353 fd, fpath = tempfile.mkstemp(suffix='.bin', prefix='gsutil_test_file',
354 text=False)
355 self.file_sizes[fpath] = file_size
356 random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES))
357 total_bytes = 0
358 file_contents = ''
359 while total_bytes < file_size:
360 num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes)
361 file_contents += random_bytes[:num_bytes]
362 total_bytes += num_bytes
363 self.file_contents[fpath] = file_contents
364 with os.fdopen(fd, 'wb') as f:
365 f.write(self.file_contents[fpath])
366 with open(fpath, 'rb') as f:
367 self.file_md5s[fpath] = CalculateB64EncodedMd5FromContents(f)
368 return fpath
370 # Create files for latency tests.
371 for file_size in self.test_file_sizes:
372 fpath = _MakeFile(file_size)
373 self.latency_files.append(fpath)
375 # Creating a file for warming up the TCP connection.
376 self.tcp_warmup_file = _MakeFile(5 * 1024 * 1024) # 5 Mebibytes.
377 # Remote file to use for TCP warmup.
378 self.tcp_warmup_remote_file = (str(self.bucket_url) +
379 os.path.basename(self.tcp_warmup_file))
381 # Local file on disk for write throughput tests.
382 self.thru_local_file = _MakeFile(self.thru_filesize)
384 # Dummy file buffer to use for downloading that goes nowhere.
385 self.discard_sink = DummyFile()
387 def _TearDown(self):
388 """Performs operations to clean things up after performing diagnostics."""
389 for fpath in self.latency_files + [self.thru_local_file,
390 self.tcp_warmup_file]:
391 try:
392 os.remove(fpath)
393 except OSError:
394 pass
396 for object_name in self.test_object_names:
398 def _Delete():
399 try:
400 self.gsutil_api.DeleteObject(self.bucket_url.bucket_name,
401 object_name,
402 provider=self.provider)
403 except NotFoundException:
404 pass
406 self._RunOperation(_Delete)
408 @contextlib.contextmanager
409 def _Time(self, key, bucket):
410 """A context manager that measures time.
412 A context manager that prints a status message before and after executing
413 the inner command and times how long the inner command takes. Keeps track of
414 the timing, aggregated by the given key.
416 Args:
417 key: The key to insert the timing value into a dictionary bucket.
418 bucket: A dictionary to place the timing value in.
420 Yields:
421 For the context manager.
423 self.logger.info('%s starting...', key)
424 t0 = time.time()
425 yield
426 t1 = time.time()
427 bucket[key].append(t1 - t0)
428 self.logger.info('%s done.', key)
430 def _RunOperation(self, func):
431 """Runs an operation with retry logic.
433 Args:
434 func: The function to run.
436 Returns:
437 True if the operation succeeds, False if aborted.
439 # We retry on httplib exceptions that can happen if the socket was closed
440 # by the remote party or the connection broke because of network issues.
441 # Only the BotoServerError is counted as a 5xx error towards the retry
442 # limit.
443 success = False
444 server_error_retried = 0
445 total_retried = 0
446 i = 0
447 return_val = None
448 while not success:
449 next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
450 try:
451 return_val = func()
452 self.total_requests += 1
453 success = True
454 except tuple(self.exceptions) as e:
455 total_retried += 1
456 if total_retried > self.MAX_TOTAL_RETRIES:
457 self.logger.info('Reached maximum total retries. Not retrying.')
458 break
459 if isinstance(e, ServiceException):
460 if e.status >= 500:
461 self.error_responses_by_code[e.status] += 1
462 self.total_requests += 1
463 self.request_errors += 1
464 server_error_retried += 1
465 time.sleep(next_sleep)
466 else:
467 raise
468 if server_error_retried > self.MAX_SERVER_ERROR_RETRIES:
469 self.logger.info(
470 'Reached maximum server error retries. Not retrying.')
471 break
472 else:
473 self.connection_breaks += 1
474 return return_val
476 def _RunLatencyTests(self):
477 """Runs latency tests."""
478 # Stores timing information for each category of operation.
479 self.results['latency'] = defaultdict(list)
481 for i in range(self.num_iterations):
482 self.logger.info('\nRunning latency iteration %d...', i+1)
483 for fpath in self.latency_files:
484 url = self.bucket_url.Clone()
485 url.object_name = os.path.basename(fpath)
486 file_size = self.file_sizes[fpath]
487 readable_file_size = MakeHumanReadable(file_size)
489 self.logger.info(
490 "\nFile of size %s located on disk at '%s' being diagnosed in the "
491 "cloud at '%s'.", readable_file_size, fpath, url)
493 upload_target = StorageUrlToUploadObjectMetadata(url)
495 def _Upload():
496 io_fp = cStringIO.StringIO(self.file_contents[fpath])
497 with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
498 self.gsutil_api.UploadObject(
499 io_fp, upload_target, size=file_size, provider=self.provider,
500 fields=['name'])
501 self._RunOperation(_Upload)
503 def _Metadata():
504 with self._Time('METADATA_%d' % file_size, self.results['latency']):
505 return self.gsutil_api.GetObjectMetadata(
506 url.bucket_name, url.object_name,
507 provider=self.provider, fields=['name', 'contentType',
508 'mediaLink', 'size'])
509 # Download will get the metadata first if we don't pass it in.
510 download_metadata = self._RunOperation(_Metadata)
511 serialization_dict = GetDownloadSerializationDict(download_metadata)
512 serialization_data = json.dumps(serialization_dict)
514 def _Download():
515 with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
516 self.gsutil_api.GetObjectMedia(
517 url.bucket_name, url.object_name, self.discard_sink,
518 provider=self.provider, serialization_data=serialization_data)
519 self._RunOperation(_Download)
521 def _Delete():
522 with self._Time('DELETE_%d' % file_size, self.results['latency']):
523 self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
524 provider=self.provider)
525 self._RunOperation(_Delete)
527 class _CpFilter(logging.Filter):
529 def filter(self, record):
530 # Used to prevent cp._LogCopyOperation from spewing output from
531 # subprocesses about every iteration.
532 msg = record.getMessage()
533 return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
534 ('Computing CRC' in msg))
536 def _PerfdiagExceptionHandler(self, e):
537 """Simple exception handler to allow post-completion status."""
538 self.logger.error(str(e))
540 def _RunReadThruTests(self):
541 """Runs read throughput tests."""
542 self.logger.info(
543 '\nRunning read throughput tests (%s iterations of size %s)' %
544 (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
546 self.results['read_throughput'] = {'file_size': self.thru_filesize,
547 'num_times': self.num_iterations,
548 'processes': self.processes,
549 'threads': self.threads}
551 # Copy the TCP warmup file.
552 warmup_url = self.bucket_url.Clone()
553 warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
554 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
555 self.test_object_names.add(warmup_url.object_name)
557 def _Upload1():
558 self.gsutil_api.UploadObject(
559 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
560 warmup_target, provider=self.provider, fields=['name'])
561 self._RunOperation(_Upload1)
563 # Copy the file to remote location before reading.
564 thru_url = self.bucket_url.Clone()
565 thru_url.object_name = os.path.basename(self.thru_local_file)
566 thru_target = StorageUrlToUploadObjectMetadata(thru_url)
567 thru_target.md5Hash = self.file_md5s[self.thru_local_file]
568 self.test_object_names.add(thru_url.object_name)
570 # Get the mediaLink here so that we can pass it to download.
571 def _Upload2():
572 return self.gsutil_api.UploadObject(
573 cStringIO.StringIO(self.file_contents[self.thru_local_file]),
574 thru_target, provider=self.provider, size=self.thru_filesize,
575 fields=['name', 'mediaLink', 'size'])
577 # Get the metadata for the object so that we are just measuring performance
578 # on the actual bytes transfer.
579 download_metadata = self._RunOperation(_Upload2)
580 serialization_dict = GetDownloadSerializationDict(download_metadata)
581 serialization_data = json.dumps(serialization_dict)
583 if self.processes == 1 and self.threads == 1:
585 # Warm up the TCP connection.
586 def _Warmup():
587 self.gsutil_api.GetObjectMedia(warmup_url.bucket_name,
588 warmup_url.object_name,
589 self.discard_sink,
590 provider=self.provider)
591 self._RunOperation(_Warmup)
593 times = []
595 def _Download():
596 t0 = time.time()
597 self.gsutil_api.GetObjectMedia(
598 thru_url.bucket_name, thru_url.object_name, self.discard_sink,
599 provider=self.provider, serialization_data=serialization_data)
600 t1 = time.time()
601 times.append(t1 - t0)
602 for _ in range(self.num_iterations):
603 self._RunOperation(_Download)
604 time_took = sum(times)
605 else:
606 args = ([(thru_url.bucket_name, thru_url.object_name, serialization_data)]
607 * self.num_iterations)
608 self.logger.addFilter(self._CpFilter())
610 t0 = time.time()
611 self.Apply(_DownloadWrapper,
612 args,
613 _PerfdiagExceptionHandler,
614 arg_checker=DummyArgChecker,
615 parallel_operations_override=True,
616 process_count=self.processes,
617 thread_count=self.threads)
618 t1 = time.time()
619 time_took = t1 - t0
621 total_bytes_copied = self.thru_filesize * self.num_iterations
622 bytes_per_second = total_bytes_copied / time_took
624 self.results['read_throughput']['time_took'] = time_took
625 self.results['read_throughput']['total_bytes_copied'] = total_bytes_copied
626 self.results['read_throughput']['bytes_per_second'] = bytes_per_second
628 def _RunWriteThruTests(self):
629 """Runs write throughput tests."""
630 self.logger.info(
631 '\nRunning write throughput tests (%s iterations of size %s)' %
632 (self.num_iterations, MakeHumanReadable(self.thru_filesize)))
634 self.results['write_throughput'] = {'file_size': self.thru_filesize,
635 'num_copies': self.num_iterations,
636 'processes': self.processes,
637 'threads': self.threads}
639 warmup_url = self.bucket_url.Clone()
640 warmup_url.object_name = os.path.basename(self.tcp_warmup_file)
641 warmup_target = StorageUrlToUploadObjectMetadata(warmup_url)
642 self.test_object_names.add(warmup_url.object_name)
644 thru_url = self.bucket_url.Clone()
645 thru_url.object_name = os.path.basename(self.thru_local_file)
646 thru_target = StorageUrlToUploadObjectMetadata(thru_url)
647 thru_tuples = []
648 for i in xrange(self.num_iterations):
649 # Create a unique name for each uploaded object. Otherwise,
650 # the XML API would fail when trying to non-atomically get metadata
651 # for the object that gets blown away by the overwrite.
652 remote_object_name = thru_target.name + str(i)
653 self.test_object_names.add(remote_object_name)
654 thru_tuples.append(UploadObjectTuple(thru_target.bucket,
655 remote_object_name,
656 filepath=self.thru_local_file))
658 if self.processes == 1 and self.threads == 1:
659 # Warm up the TCP connection.
660 def _Warmup():
661 self.gsutil_api.UploadObject(
662 cStringIO.StringIO(self.file_contents[self.tcp_warmup_file]),
663 warmup_target, provider=self.provider, size=self.thru_filesize,
664 fields=['name'])
665 self._RunOperation(_Warmup)
667 times = []
669 for i in xrange(self.num_iterations):
670 thru_tuple = thru_tuples[i]
671 def _Upload():
672 """Uploads the write throughput measurement object."""
673 upload_target = apitools_messages.Object(
674 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
675 md5Hash=thru_tuple.md5)
676 io_fp = cStringIO.StringIO(self.file_contents[self.thru_local_file])
677 t0 = time.time()
678 if self.thru_filesize < ResumableThreshold():
679 self.gsutil_api.UploadObject(
680 io_fp, upload_target, provider=self.provider,
681 size=self.thru_filesize, fields=['name'])
682 else:
683 self.gsutil_api.UploadObjectResumable(
684 io_fp, upload_target, provider=self.provider,
685 size=self.thru_filesize, fields=['name'],
686 tracker_callback=_DummyTrackerCallback)
688 t1 = time.time()
689 times.append(t1 - t0)
691 self._RunOperation(_Upload)
692 time_took = sum(times)
694 else:
695 args = thru_tuples
696 t0 = time.time()
697 self.Apply(_UploadWrapper,
698 args,
699 _PerfdiagExceptionHandler,
700 arg_checker=DummyArgChecker,
701 parallel_operations_override=True,
702 process_count=self.processes,
703 thread_count=self.threads)
704 t1 = time.time()
705 time_took = t1 - t0
707 total_bytes_copied = self.thru_filesize * self.num_iterations
708 bytes_per_second = total_bytes_copied / time_took
710 self.results['write_throughput']['time_took'] = time_took
711 self.results['write_throughput']['total_bytes_copied'] = total_bytes_copied
712 self.results['write_throughput']['bytes_per_second'] = bytes_per_second
714 def _RunListTests(self):
715 """Runs eventual consistency listing latency tests."""
716 self.results['listing'] = {'num_files': self.num_iterations}
718 # Generate N random object names to put in the bucket.
719 list_prefix = 'gsutil-perfdiag-list-'
720 list_objects = []
721 for _ in xrange(self.num_iterations):
722 list_object_name = u'%s%s' % (list_prefix, os.urandom(20).encode('hex'))
723 self.test_object_names.add(list_object_name)
724 list_objects.append(list_object_name)
726 # Add the objects to the bucket.
727 self.logger.info(
728 '\nWriting %s objects for listing test...', self.num_iterations)
729 empty_md5 = CalculateB64EncodedMd5FromContents(cStringIO.StringIO(''))
730 args = [
731 UploadObjectTuple(self.bucket_url.bucket_name, name, md5=empty_md5,
732 contents='') for name in list_objects]
733 self.Apply(_UploadWrapper, args, _PerfdiagExceptionHandler,
734 arg_checker=DummyArgChecker)
736 list_latencies = []
737 files_seen = []
738 total_start_time = time.time()
739 expected_objects = set(list_objects)
740 found_objects = set()
742 def _List():
743 """Lists and returns objects in the bucket. Also records latency."""
744 t0 = time.time()
745 objects = list(self.gsutil_api.ListObjects(
746 self.bucket_url.bucket_name, prefix=list_prefix, delimiter='/',
747 provider=self.provider, fields=['items/name']))
748 t1 = time.time()
749 list_latencies.append(t1 - t0)
750 return set([obj.data.name for obj in objects])
752 self.logger.info(
753 'Listing bucket %s waiting for %s objects to appear...',
754 self.bucket_url.bucket_name, self.num_iterations)
755 while expected_objects - found_objects:
756 def _ListAfterUpload():
757 names = _List()
758 found_objects.update(names & expected_objects)
759 files_seen.append(len(found_objects))
760 self._RunOperation(_ListAfterUpload)
761 if expected_objects - found_objects:
762 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
763 self.logger.warning('Maximum time reached waiting for listing.')
764 break
765 total_end_time = time.time()
767 self.results['listing']['insert'] = {
768 'num_listing_calls': len(list_latencies),
769 'list_latencies': list_latencies,
770 'files_seen_after_listing': files_seen,
771 'time_took': total_end_time - total_start_time,
774 self.logger.info(
775 'Deleting %s objects for listing test...', self.num_iterations)
776 self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
777 arg_checker=DummyArgChecker)
779 self.logger.info(
780 'Listing bucket %s waiting for %s objects to disappear...',
781 self.bucket_url.bucket_name, self.num_iterations)
782 list_latencies = []
783 files_seen = []
784 total_start_time = time.time()
785 found_objects = set(list_objects)
786 while found_objects:
787 def _ListAfterDelete():
788 names = _List()
789 found_objects.intersection_update(names)
790 files_seen.append(len(found_objects))
791 self._RunOperation(_ListAfterDelete)
792 if found_objects:
793 if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
794 self.logger.warning('Maximum time reached waiting for listing.')
795 break
796 total_end_time = time.time()
798 self.results['listing']['delete'] = {
799 'num_listing_calls': len(list_latencies),
800 'list_latencies': list_latencies,
801 'files_seen_after_listing': files_seen,
802 'time_took': total_end_time - total_start_time,
805 def Upload(self, thru_tuple, thread_state=None):
806 gsutil_api = GetCloudApiInstance(self, thread_state)
808 md5hash = thru_tuple.md5
809 contents = thru_tuple.contents
810 if thru_tuple.filepath:
811 md5hash = self.file_md5s[thru_tuple.filepath]
812 contents = self.file_contents[thru_tuple.filepath]
814 upload_target = apitools_messages.Object(
815 bucket=thru_tuple.bucket_name, name=thru_tuple.object_name,
816 md5Hash=md5hash)
817 file_size = len(contents)
818 if file_size < ResumableThreshold():
819 gsutil_api.UploadObject(
820 cStringIO.StringIO(contents), upload_target,
821 provider=self.provider, size=file_size, fields=['name'])
822 else:
823 gsutil_api.UploadObjectResumable(
824 cStringIO.StringIO(contents), upload_target,
825 provider=self.provider, size=file_size, fields=['name'],
826 tracker_callback=_DummyTrackerCallback)
828 def Download(self, download_tuple, thread_state=None):
829 """Downloads a file.
831 Args:
832 download_tuple: (bucket name, object name, serialization data for object).
833 thread_state: gsutil Cloud API instance to use for the download.
835 gsutil_api = GetCloudApiInstance(self, thread_state)
836 gsutil_api.GetObjectMedia(
837 download_tuple[0], download_tuple[1], self.discard_sink,
838 provider=self.provider, serialization_data=download_tuple[2])
840 def Delete(self, thru_tuple, thread_state=None):
841 gsutil_api = thread_state or self.gsutil_api
842 gsutil_api.DeleteObject(
843 thru_tuple.bucket_name, thru_tuple.object_name, provider=self.provider)
845 def _GetDiskCounters(self):
846 """Retrieves disk I/O statistics for all disks.
848 Adapted from the psutil module's psutil._pslinux.disk_io_counters:
849 http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py
851 Originally distributed under under a BSD license.
852 Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.
854 Returns:
855 A dictionary containing disk names mapped to the disk counters from
856 /disk/diskstats.
858 # iostat documentation states that sectors are equivalent with blocks and
859 # have a size of 512 bytes since 2.4 kernels. This value is needed to
860 # calculate the amount of disk I/O in bytes.
861 sector_size = 512
863 partitions = []
864 with open('/proc/partitions', 'r') as f:
865 lines = f.readlines()[2:]
866 for line in lines:
867 _, _, _, name = line.split()
868 if name[-1].isdigit():
869 partitions.append(name)
871 retdict = {}
872 with open('/proc/diskstats', 'r') as f:
873 for line in f:
874 values = line.split()[:11]
875 _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values
876 if name in partitions:
877 rbytes = int(rbytes) * sector_size
878 wbytes = int(wbytes) * sector_size
879 reads = int(reads)
880 writes = int(writes)
881 rtime = int(rtime)
882 wtime = int(wtime)
883 retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime)
884 return retdict
886 def _GetTcpStats(self):
887 """Tries to parse out TCP packet information from netstat output.
889 Returns:
890 A dictionary containing TCP information, or None if netstat is not
891 available.
893 # netstat return code is non-zero for -s on Linux, so don't raise on error.
894 try:
895 netstat_output = self._Exec(['netstat', '-s'], return_output=True,
896 raise_on_error=False)
897 except OSError:
898 self.logger.warning('netstat not found on your system; some measurement '
899 'data will be missing')
900 return None
901 netstat_output = netstat_output.strip().lower()
902 found_tcp = False
903 tcp_retransmit = None
904 tcp_received = None
905 tcp_sent = None
906 for line in netstat_output.split('\n'):
907 # Header for TCP section is "Tcp:" in Linux/Mac and
908 # "TCP Statistics for" in Windows.
909 if 'tcp:' in line or 'tcp statistics' in line:
910 found_tcp = True
912 # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts"
913 # Windows == "segments retransmitted".
914 if (found_tcp and tcp_retransmit is None and
915 ('segments retransmited' in line or 'retransmit timeouts' in line or
916 'segments retransmitted' in line)):
917 tcp_retransmit = ''.join(c for c in line if c in string.digits)
919 # Linux+Windows == "segments received", Mac == "packets received".
920 if (found_tcp and tcp_received is None and
921 ('segments received' in line or 'packets received' in line)):
922 tcp_received = ''.join(c for c in line if c in string.digits)
924 # Linux == "segments send out" (sic), Mac+Windows == "packets sent".
925 if (found_tcp and tcp_sent is None and
926 ('segments send out' in line or 'packets sent' in line or
927 'segments sent' in line)):
928 tcp_sent = ''.join(c for c in line if c in string.digits)
930 result = {}
931 try:
932 result['tcp_retransmit'] = int(tcp_retransmit)
933 result['tcp_received'] = int(tcp_received)
934 result['tcp_sent'] = int(tcp_sent)
935 except (ValueError, TypeError):
936 result['tcp_retransmit'] = None
937 result['tcp_received'] = None
938 result['tcp_sent'] = None
940 return result
942 def _CollectSysInfo(self):
943 """Collects system information."""
944 sysinfo = {}
946 # All exceptions that might be raised from socket module calls.
947 socket_errors = (
948 socket.error, socket.herror, socket.gaierror, socket.timeout)
950 # Find out whether HTTPS is enabled in Boto.
951 sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)
953 # Look up proxy info.
954 proxy_host = boto.config.get('Boto', 'proxy', None)
955 proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
956 sysinfo['using_proxy'] = bool(proxy_host)
958 if boto.config.get('Boto', 'proxy_rdns', False):
959 self.logger.info('DNS lookups are disallowed in this environment, so '
960 'some information is not included in this perfdiag run.')
962 # Get the local IP address from socket lib.
963 try:
964 sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
965 except socket_errors:
966 sysinfo['ip_address'] = ''
967 # Record the temporary directory used since it can affect performance, e.g.
968 # when on a networked filesystem.
969 sysinfo['tempdir'] = tempfile.gettempdir()
971 # Produces an RFC 2822 compliant GMT timestamp.
972 sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
973 time.gmtime())
975 # Execute a CNAME lookup on Google DNS to find what Google server
976 # it's routing to.
977 cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
978 try:
979 nslookup_cname_output = self._Exec(cmd, return_output=True)
980 m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output)
981 sysinfo['googserv_route'] = m.group('googserv') if m else None
982 except (CommandException, OSError):
983 sysinfo['googserv_route'] = ''
985 # Try to determine the latency of a DNS lookup for the Google hostname
986 # endpoint. Note: we don't piggyback on gethostbyname_ex below because
987 # the _ex version requires an extra RTT.
988 try:
989 t0 = time.time()
990 socket.gethostbyname(self.XML_API_HOST)
991 t1 = time.time()
992 sysinfo['google_host_dns_latency'] = t1 - t0
993 except socket_errors:
994 pass
996 # Look up IP addresses for Google Server.
997 try:
998 (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
999 sysinfo['googserv_ips'] = ipaddrlist
1000 except socket_errors:
1001 ipaddrlist = []
1002 sysinfo['googserv_ips'] = []
1004 # Reverse lookup the hostnames for the Google Server IPs.
1005 sysinfo['googserv_hostnames'] = []
1006 for googserv_ip in ipaddrlist:
1007 try:
1008 (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
1009 sysinfo['googserv_hostnames'].append(hostname)
1010 except socket_errors:
1011 pass
1013 # Query o-o to find out what the Google DNS thinks is the user's IP.
1014 try:
1015 cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.']
1016 nslookup_txt_output = self._Exec(cmd, return_output=True)
1017 m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output)
1018 sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
1019 except (CommandException, OSError):
1020 sysinfo['dns_o-o_ip'] = ''
1022 # Try to determine the latency of connecting to the Google hostname
1023 # endpoint.
1024 sysinfo['google_host_connect_latencies'] = {}
1025 for googserv_ip in ipaddrlist:
1026 try:
1027 sock = socket.socket()
1028 t0 = time.time()
1029 sock.connect((googserv_ip, self.XML_API_PORT))
1030 t1 = time.time()
1031 sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
1032 except socket_errors:
1033 pass
1035 # If using a proxy, try to determine the latency of a DNS lookup to resolve
1036 # the proxy hostname and the latency of connecting to the proxy.
1037 if proxy_host:
1038 proxy_ip = None
1039 try:
1040 t0 = time.time()
1041 proxy_ip = socket.gethostbyname(proxy_host)
1042 t1 = time.time()
1043 sysinfo['proxy_dns_latency'] = t1 - t0
1044 except socket_errors:
1045 pass
1047 try:
1048 sock = socket.socket()
1049 t0 = time.time()
1050 sock.connect((proxy_ip or proxy_host, proxy_port))
1051 t1 = time.time()
1052 sysinfo['proxy_host_connect_latency'] = t1 - t0
1053 except socket_errors:
1054 pass
1056 # Try and find the number of CPUs in the system if available.
1057 try:
1058 sysinfo['cpu_count'] = multiprocessing.cpu_count()
1059 except NotImplementedError:
1060 sysinfo['cpu_count'] = None
1062 # For *nix platforms, obtain the CPU load.
1063 try:
1064 sysinfo['load_avg'] = list(os.getloadavg())
1065 except (AttributeError, OSError):
1066 sysinfo['load_avg'] = None
1068 # Try and collect memory information from /proc/meminfo if possible.
1069 mem_total = None
1070 mem_free = None
1071 mem_buffers = None
1072 mem_cached = None
1074 try:
1075 with open('/proc/meminfo', 'r') as f:
1076 for line in f:
1077 if line.startswith('MemTotal'):
1078 mem_total = (int(''.join(c for c in line if c in string.digits))
1079 * 1000)
1080 elif line.startswith('MemFree'):
1081 mem_free = (int(''.join(c for c in line if c in string.digits))
1082 * 1000)
1083 elif line.startswith('Buffers'):
1084 mem_buffers = (int(''.join(c for c in line if c in string.digits))
1085 * 1000)
1086 elif line.startswith('Cached'):
1087 mem_cached = (int(''.join(c for c in line if c in string.digits))
1088 * 1000)
1089 except (IOError, ValueError):
1090 pass
1092 sysinfo['meminfo'] = {'mem_total': mem_total,
1093 'mem_free': mem_free,
1094 'mem_buffers': mem_buffers,
1095 'mem_cached': mem_cached}
1097 # Get configuration attributes from config module.
1098 sysinfo['gsutil_config'] = {}
1099 for attr in dir(config):
1100 attr_value = getattr(config, attr)
1101 # Filter out multiline strings that are not useful.
1102 if attr.isupper() and not (isinstance(attr_value, basestring) and
1103 '\n' in attr_value):
1104 sysinfo['gsutil_config'][attr] = attr_value
1106 sysinfo['tcp_proc_values'] = {}
1107 stats_to_check = [
1108 '/proc/sys/net/core/rmem_default',
1109 '/proc/sys/net/core/rmem_max',
1110 '/proc/sys/net/core/wmem_default',
1111 '/proc/sys/net/core/wmem_max',
1112 '/proc/sys/net/ipv4/tcp_timestamps',
1113 '/proc/sys/net/ipv4/tcp_sack',
1114 '/proc/sys/net/ipv4/tcp_window_scaling',
1116 for fname in stats_to_check:
1117 try:
1118 with open(fname, 'r') as f:
1119 value = f.read()
1120 sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip()
1121 except IOError:
1122 pass
1124 self.results['sysinfo'] = sysinfo
1126 def _DisplayStats(self, trials):
1127 """Prints out mean, standard deviation, median, and 90th percentile."""
1128 n = len(trials)
1129 mean = float(sum(trials)) / n
1130 stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n)
1132 print str(n).rjust(6), '',
1133 print ('%.1f' % (mean * 1000)).rjust(9), '',
1134 print ('%.1f' % (stdev * 1000)).rjust(12), '',
1135 print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '',
1136 print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), ''
1138 def _DisplayResults(self):
1139 """Displays results collected from diagnostic run."""
1140 print
1141 print '=' * 78
1142 print 'DIAGNOSTIC RESULTS'.center(78)
1143 print '=' * 78
1145 if 'latency' in self.results:
1146 print
1147 print '-' * 78
1148 print 'Latency'.center(78)
1149 print '-' * 78
1150 print ('Operation Size Trials Mean (ms) Std Dev (ms) '
1151 'Median (ms) 90th % (ms)')
1152 print ('========= ========= ====== ========= ============ '
1153 '=========== ===========')
1154 for key in sorted(self.results['latency']):
1155 trials = sorted(self.results['latency'][key])
1156 op, numbytes = key.split('_')
1157 numbytes = int(numbytes)
1158 if op == 'METADATA':
1159 print 'Metadata'.rjust(9), '',
1160 print MakeHumanReadable(numbytes).rjust(9), '',
1161 self._DisplayStats(trials)
1162 if op == 'DOWNLOAD':
1163 print 'Download'.rjust(9), '',
1164 print MakeHumanReadable(numbytes).rjust(9), '',
1165 self._DisplayStats(trials)
1166 if op == 'UPLOAD':
1167 print 'Upload'.rjust(9), '',
1168 print MakeHumanReadable(numbytes).rjust(9), '',
1169 self._DisplayStats(trials)
1170 if op == 'DELETE':
1171 print 'Delete'.rjust(9), '',
1172 print MakeHumanReadable(numbytes).rjust(9), '',
1173 self._DisplayStats(trials)
1175 if 'write_throughput' in self.results:
1176 print
1177 print '-' * 78
1178 print 'Write Throughput'.center(78)
1179 print '-' * 78
1180 write_thru = self.results['write_throughput']
1181 print 'Copied a %s file %d times for a total transfer size of %s.' % (
1182 MakeHumanReadable(write_thru['file_size']),
1183 write_thru['num_copies'],
1184 MakeHumanReadable(write_thru['total_bytes_copied']))
1185 print 'Write throughput: %s/s.' % (
1186 MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
1188 if 'read_throughput' in self.results:
1189 print
1190 print '-' * 78
1191 print 'Read Throughput'.center(78)
1192 print '-' * 78
1193 read_thru = self.results['read_throughput']
1194 print 'Copied a %s file %d times for a total transfer size of %s.' % (
1195 MakeHumanReadable(read_thru['file_size']),
1196 read_thru['num_times'],
1197 MakeHumanReadable(read_thru['total_bytes_copied']))
1198 print 'Read throughput: %s/s.' % (
1199 MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
1201 if 'listing' in self.results:
1202 print
1203 print '-' * 78
1204 print 'Listing'.center(78)
1205 print '-' * 78
1207 listing = self.results['listing']
1208 insert = listing['insert']
1209 delete = listing['delete']
1210 print 'After inserting %s objects:' % listing['num_files']
1211 print (' Total time for objects to appear: %.2g seconds' %
1212 insert['time_took'])
1213 print ' Number of listing calls made: %s' % insert['num_listing_calls']
1214 print (' Individual listing call latencies: [%s]' %
1215 ', '.join('%.2gs' % lat for lat in insert['list_latencies']))
1216 print (' Files reflected after each call: [%s]' %
1217 ', '.join(map(str, insert['files_seen_after_listing'])))
1219 print 'After deleting %s objects:' % listing['num_files']
1220 print (' Total time for objects to appear: %.2g seconds' %
1221 delete['time_took'])
1222 print ' Number of listing calls made: %s' % delete['num_listing_calls']
1223 print (' Individual listing call latencies: [%s]' %
1224 ', '.join('%.2gs' % lat for lat in delete['list_latencies']))
1225 print (' Files reflected after each call: [%s]' %
1226 ', '.join(map(str, delete['files_seen_after_listing'])))
1228 if 'sysinfo' in self.results:
1229 print
1230 print '-' * 78
1231 print 'System Information'.center(78)
1232 print '-' * 78
1233 info = self.results['sysinfo']
1234 print 'IP Address: \n %s' % info['ip_address']
1235 print 'Temporary Directory: \n %s' % info['tempdir']
1236 print 'Bucket URI: \n %s' % self.results['bucket_uri']
1237 print 'gsutil Version: \n %s' % self.results.get('gsutil_version',
1238 'Unknown')
1239 print 'boto Version: \n %s' % self.results.get('boto_version', 'Unknown')
1241 if 'gmt_timestamp' in info:
1242 ts_string = info['gmt_timestamp']
1243 timetuple = None
1244 try:
1245 # Convert RFC 2822 string to Linux timestamp.
1246 timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000')
1247 except ValueError:
1248 pass
1250 if timetuple:
1251 # Converts the GMT time tuple to local Linux timestamp.
1252 localtime = calendar.timegm(timetuple)
1253 localdt = datetime.datetime.fromtimestamp(localtime)
1254 print 'Measurement time: \n %s' % localdt.strftime(
1255 '%Y-%m-%d %I:%M:%S %p %Z')
1257 print 'Google Server: \n %s' % info['googserv_route']
1258 print ('Google Server IP Addresses: \n %s' %
1259 ('\n '.join(info['googserv_ips'])))
1260 print ('Google Server Hostnames: \n %s' %
1261 ('\n '.join(info['googserv_hostnames'])))
1262 print 'Google DNS thinks your IP is: \n %s' % info['dns_o-o_ip']
1263 print 'CPU Count: \n %s' % info['cpu_count']
1264 print 'CPU Load Average: \n %s' % info['load_avg']
1265 try:
1266 print ('Total Memory: \n %s' %
1267 MakeHumanReadable(info['meminfo']['mem_total']))
1268 # Free memory is really MemFree + Buffers + Cached.
1269 print 'Free Memory: \n %s' % MakeHumanReadable(
1270 info['meminfo']['mem_free'] +
1271 info['meminfo']['mem_buffers'] +
1272 info['meminfo']['mem_cached'])
1273 except TypeError:
1274 pass
1276 if 'netstat_end' in info and 'netstat_start' in info:
1277 netstat_after = info['netstat_end']
1278 netstat_before = info['netstat_start']
1279 for tcp_type in ('sent', 'received', 'retransmit'):
1280 try:
1281 delta = (netstat_after['tcp_%s' % tcp_type] -
1282 netstat_before['tcp_%s' % tcp_type])
1283 print 'TCP segments %s during test:\n %d' % (tcp_type, delta)
1284 except TypeError:
1285 pass
1286 else:
1287 print ('TCP segment counts not available because "netstat" was not '
1288 'found during test runs')
1290 if 'disk_counters_end' in info and 'disk_counters_start' in info:
1291 print 'Disk Counter Deltas:\n',
1292 disk_after = info['disk_counters_end']
1293 disk_before = info['disk_counters_start']
1294 print '', 'disk'.rjust(6),
1295 for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime',
1296 'wtime']:
1297 print colname.rjust(8),
1298 print
1299 for diskname in sorted(disk_after):
1300 before = disk_before[diskname]
1301 after = disk_after[diskname]
1302 (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before
1303 (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after
1304 print '', diskname.rjust(6),
1305 deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1,
1306 wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1]
1307 for delta in deltas:
1308 print str(delta).rjust(8),
1309 print
1311 if 'tcp_proc_values' in info:
1312 print 'TCP /proc values:\n',
1313 for item in info['tcp_proc_values'].iteritems():
1314 print ' %s = %s' % item
1316 if 'boto_https_enabled' in info:
1317 print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled']
1319 if 'using_proxy' in info:
1320 print 'Requests routed through proxy: \n %s' % info['using_proxy']
1322 if 'google_host_dns_latency' in info:
1323 print ('Latency of the DNS lookup for Google Storage server (ms): '
1324 '\n %.1f' % (info['google_host_dns_latency'] * 1000.0))
1326 if 'google_host_connect_latencies' in info:
1327 print 'Latencies connecting to Google Storage server IPs (ms):'
1328 for ip, latency in info['google_host_connect_latencies'].iteritems():
1329 print ' %s = %.1f' % (ip, latency * 1000.0)
1331 if 'proxy_dns_latency' in info:
1332 print ('Latency of the DNS lookup for the configured proxy (ms): '
1333 '\n %.1f' % (info['proxy_dns_latency'] * 1000.0))
1335 if 'proxy_host_connect_latency' in info:
1336 print ('Latency connecting to the configured proxy (ms): \n %.1f' %
1337 (info['proxy_host_connect_latency'] * 1000.0))
1339 if 'request_errors' in self.results and 'total_requests' in self.results:
1340 print
1341 print '-' * 78
1342 print 'In-Process HTTP Statistics'.center(78)
1343 print '-' * 78
1344 total = int(self.results['total_requests'])
1345 numerrors = int(self.results['request_errors'])
1346 numbreaks = int(self.results['connection_breaks'])
1347 availability = (((total - numerrors) / float(total)) * 100
1348 if total > 0 else 100)
1349 print 'Total HTTP requests made: %d' % total
1350 print 'HTTP 5xx errors: %d' % numerrors
1351 print 'HTTP connections broken: %d' % numbreaks
1352 print 'Availability: %.7g%%' % availability
1353 if 'error_responses_by_code' in self.results:
1354 sorted_codes = sorted(
1355 self.results['error_responses_by_code'].iteritems())
1356 if sorted_codes:
1357 print 'Error responses by code:'
1358 print '\n'.join(' %s: %s' % c for c in sorted_codes)
1360 if self.output_file:
1361 with open(self.output_file, 'w') as f:
1362 json.dump(self.results, f, indent=2)
1363 print
1364 print "Output file written to '%s'." % self.output_file
1366 print
1368 def _ParsePositiveInteger(self, val, msg):
1369 """Tries to convert val argument to a positive integer.
1371 Args:
1372 val: The value (as a string) to convert to a positive integer.
1373 msg: The error message to place in the CommandException on an error.
1375 Returns:
1376 A valid positive integer.
1378 Raises:
1379 CommandException: If the supplied value is not a valid positive integer.
1381 try:
1382 val = int(val)
1383 if val < 1:
1384 raise CommandException(msg)
1385 return val
1386 except ValueError:
1387 raise CommandException(msg)
1389 def _ParseArgs(self):
1390 """Parses arguments for perfdiag command."""
1391 # From -n.
1392 self.num_iterations = 5
1393 # From -c.
1394 self.processes = 1
1395 # From -k.
1396 self.threads = 1
1397 # From -s.
1398 self.thru_filesize = 1048576
1399 # From -t.
1400 self.diag_tests = self.DEFAULT_DIAG_TESTS
1401 # From -o.
1402 self.output_file = None
1403 # From -i.
1404 self.input_file = None
1405 # From -m.
1406 self.metadata_keys = {}
1408 if self.sub_opts:
1409 for o, a in self.sub_opts:
1410 if o == '-n':
1411 self.num_iterations = self._ParsePositiveInteger(
1412 a, 'The -n parameter must be a positive integer.')
1413 if o == '-c':
1414 self.processes = self._ParsePositiveInteger(
1415 a, 'The -c parameter must be a positive integer.')
1416 if o == '-k':
1417 self.threads = self._ParsePositiveInteger(
1418 a, 'The -k parameter must be a positive integer.')
1419 if o == '-s':
1420 try:
1421 self.thru_filesize = HumanReadableToBytes(a)
1422 except ValueError:
1423 raise CommandException('Invalid -s parameter.')
1424 if self.thru_filesize > (20 * 1024 ** 3): # Max 20 GiB.
1425 raise CommandException(
1426 'Maximum throughput file size parameter (-s) is 20 GiB.')
1427 if o == '-t':
1428 self.diag_tests = []
1429 for test_name in a.strip().split(','):
1430 if test_name.lower() not in self.ALL_DIAG_TESTS:
1431 raise CommandException("List of test names (-t) contains invalid "
1432 "test name '%s'." % test_name)
1433 self.diag_tests.append(test_name)
1434 if o == '-m':
1435 pieces = a.split(':')
1436 if len(pieces) != 2:
1437 raise CommandException(
1438 "Invalid metadata key-value combination '%s'." % a)
1439 key, value = pieces
1440 self.metadata_keys[key] = value
1441 if o == '-o':
1442 self.output_file = os.path.abspath(a)
1443 if o == '-i':
1444 self.input_file = os.path.abspath(a)
1445 if not os.path.isfile(self.input_file):
1446 raise CommandException("Invalid input file (-i): '%s'." % a)
1447 try:
1448 with open(self.input_file, 'r') as f:
1449 self.results = json.load(f)
1450 self.logger.info("Read input file: '%s'.", self.input_file)
1451 except ValueError:
1452 raise CommandException("Could not decode input file (-i): '%s'." %
1454 return
1455 if not self.args:
1456 self.RaiseWrongNumberOfArgumentsException()
1458 self.bucket_url = StorageUrlFromString(self.args[0])
1459 self.provider = self.bucket_url.scheme
1460 if not (self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket()):
1461 raise CommandException('The perfdiag command requires a URL that '
1462 'specifies a bucket.\n"%s" is not '
1463 'valid.' % self.args[0])
1464 # Ensure the bucket exists.
1465 self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
1466 provider=self.bucket_url.scheme,
1467 fields=['id'])
1468 self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
1469 socket.timeout, httplib.BadStatusLine,
1470 ServiceException]
1472 # Command entry point.
1473 def RunCommand(self):
1474 """Called by gsutil when the command is being invoked."""
1475 self._ParseArgs()
1477 if self.input_file:
1478 self._DisplayResults()
1479 return 0
1481 # We turn off retries in the underlying boto library because the
1482 # _RunOperation function handles errors manually so it can count them.
1483 boto.config.set('Boto', 'num_retries', '0')
1485 self.logger.info(
1486 'Number of iterations to run: %d\n'
1487 'Base bucket URI: %s\n'
1488 'Number of processes: %d\n'
1489 'Number of threads: %d\n'
1490 'Throughput file size: %s\n'
1491 'Diagnostics to run: %s',
1492 self.num_iterations,
1493 self.bucket_url,
1494 self.processes,
1495 self.threads,
1496 MakeHumanReadable(self.thru_filesize),
1497 (', '.join(self.diag_tests)))
1499 try:
1500 self._SetUp()
1502 # Collect generic system info.
1503 self._CollectSysInfo()
1504 # Collect netstat info and disk counters before tests (and again later).
1505 netstat_output = self._GetTcpStats()
1506 if netstat_output:
1507 self.results['sysinfo']['netstat_start'] = netstat_output
1508 if IS_LINUX:
1509 self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
1510 # Record bucket URL.
1511 self.results['bucket_uri'] = str(self.bucket_url)
1512 self.results['json_format'] = 'perfdiag'
1513 self.results['metadata'] = self.metadata_keys
1515 if self.LAT in self.diag_tests:
1516 self._RunLatencyTests()
1517 if self.RTHRU in self.diag_tests:
1518 self._RunReadThruTests()
1519 if self.WTHRU in self.diag_tests:
1520 self._RunWriteThruTests()
1521 if self.LIST in self.diag_tests:
1522 self._RunListTests()
1524 # Collect netstat info and disk counters after tests.
1525 netstat_output = self._GetTcpStats()
1526 if netstat_output:
1527 self.results['sysinfo']['netstat_end'] = netstat_output
1528 if IS_LINUX:
1529 self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters()
1531 self.results['total_requests'] = self.total_requests
1532 self.results['request_errors'] = self.request_errors
1533 self.results['error_responses_by_code'] = self.error_responses_by_code
1534 self.results['connection_breaks'] = self.connection_breaks
1535 self.results['gsutil_version'] = gslib.VERSION
1536 self.results['boto_version'] = boto.__version__
1538 self._DisplayResults()
1539 finally:
1540 # TODO: Install signal handlers so this is performed in response to a
1541 # terminating signal; consider multi-threaded object deletes during
1542 # cleanup so it happens quickly.
1543 self._TearDown()
1545 return 0
1548 class UploadObjectTuple(object):
1549 """Picklable tuple with necessary metadata for an insert object call."""
1551 def __init__(self, bucket_name, object_name, filepath=None, md5=None,
1552 contents=None):
1553 """Create an upload tuple.
1555 Args:
1556 bucket_name: Name of the bucket to upload to.
1557 object_name: Name of the object to upload to.
1558 filepath: A file path located in self.file_contents and self.file_md5s.
1559 md5: The MD5 hash of the object being uploaded.
1560 contents: The contents of the file to be uploaded.
1562 Note: (contents + md5) and filepath are mutually exlusive. You may specify
1563 one or the other, but not both.
1564 Note: If one of contents or md5 are specified, they must both be specified.
1566 Raises:
1567 InvalidArgument: if the arguments are invalid.
1569 self.bucket_name = bucket_name
1570 self.object_name = object_name
1571 self.filepath = filepath
1572 self.md5 = md5
1573 self.contents = contents
1574 if filepath and (md5 or contents is not None):
1575 raise InvalidArgument(
1576 'Only one of filepath or (md5 + contents) may be specified.')
1577 if not filepath and (not md5 or contents is None):
1578 raise InvalidArgument(
1579 'Both md5 and contents must be specified.')
1582 def StorageUrlToUploadObjectMetadata(storage_url):
1583 if storage_url.IsCloudUrl() and storage_url.IsObject():
1584 upload_target = apitools_messages.Object()
1585 upload_target.name = storage_url.object_name
1586 upload_target.bucket = storage_url.bucket_name
1587 return upload_target
1588 else:
1589 raise CommandException('Non-cloud URL upload target %s was created in '
1590 'perfdiag implemenation.' % storage_url)