2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like. Any license provided herein, whether implied or
15 * otherwise, applies only to this software file. Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24 * Mountain View, CA 94043, or:
29 * will open or create each file on the command line, and start a series
32 * aio is done in a rotating loop. first file1 gets 8 requests, then
33 * file2, then file3 etc. As each file finishes writing, it is switched
36 * io buffers are aligned in case you want to do raw io
38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
40 * run aio-stress -h to see the options
42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
44 #define _FILE_OFFSET_BITS 64
45 #define PROG_VERSION "0.21"
53 #include <sys/types.h>
67 #define RUN_FOREVER -1
70 #define O_DIRECT 040000 /* direct disk access hint */
86 * various globals, these are effectively read only by the time the threads
90 unsigned long page_size_mask
;
93 int latency_stats
= 0;
94 int completion_latency_stats
= 0;
96 int iterations
= RUN_FOREVER
;
97 int max_io_submit
= 0;
98 long rec_len
= 64 * 1024;
101 int num_contexts
= 1;
102 off_t context_offset
= 2 * 1024 * 1024;
103 int fsync_stages
= 1;
106 char *unaligned_buffer
= NULL
;
107 char *aligned_buffer
= NULL
;
108 int padded_reclen
= 0;
111 char *verify_buf
= NULL
;
112 int unlink_files
= 0;
117 /* pthread mutexes and other globals for keeping the threads in sync */
118 pthread_cond_t stage_cond
= PTHREAD_COND_INITIALIZER
;
119 pthread_mutex_t stage_mutex
= PTHREAD_MUTEX_INITIALIZER
;
120 int threads_ending
= 0;
121 int threads_starting
= 0;
122 struct timeval global_stage_start_time
;
123 struct thread_info
*global_thread_info
;
126 * latencies during io_submit are measured, these are the
127 * granularities for deviations
130 int deviations
[DEVIATIONS
] = { 100, 250, 500, 1000, 5000, 10000 };
136 double deviations
[DEVIATIONS
];
139 /* container for a series of operations to a file */
141 /* already open file descriptor, valid for whatever operation you want */
144 /* starting byte of the operation */
147 /* ending byte of the operation */
150 /* size of the read/write buffer */
153 /* max number of pending requests before a wait is triggered */
156 /* current number of pending requests */
159 /* last error, zero if there were none */
162 /* total number of errors hit. */
165 /* read,write, random, etc */
168 /* number of ios that will get sent to aio */
171 /* number of ios we've already sent */
174 /* last offset used in an io operation */
177 /* stonewalled = 1 when we got cut off before submitting all our ios */
180 /* list management */
181 struct io_oper
*next
;
182 struct io_oper
*prev
;
184 struct timeval start_time
;
189 /* a single io, and all the tracking needed for it */
191 /* note, iocb must go first! */
194 /* pointer to parent io operation struct */
195 struct io_oper
*io_oper
;
200 /* size of the aligned buffer (record size) */
203 /* state of this io unit (free, pending, done) */
206 /* result of last operation */
209 struct io_unit
*next
;
211 struct timeval io_start_time
; /* time of io_submit */
218 /* allocated array of io_unit structs */
221 /* list of io units available for io */
222 struct io_unit
*free_ious
;
224 /* number of io units in the ios array */
227 /* number of io units in flight */
228 int num_global_pending
;
230 /* preallocated array of iocb pointers, only used in run_active */
233 /* preallocated array of events */
234 struct io_event
*events
;
236 /* size of the events array */
237 int num_global_events
;
239 /* latency stats for io_submit */
240 struct io_latency io_submit_latency
;
242 /* list of operations still in progress, and of those finished */
243 struct io_oper
*active_opers
;
244 struct io_oper
*finished_opers
;
246 /* number of files this thread is doing io on */
249 /* how much io this thread did in the last stage */
250 double stage_mb_trans
;
252 /* latency completion stats i/o time from io_submit until io_getevents */
253 struct io_latency io_completion_latency
;
257 * return seconds between start_tv and stop_tv in double precision
259 static double time_since(struct timeval
*start_tv
, struct timeval
*stop_tv
)
263 sec
= stop_tv
->tv_sec
- start_tv
->tv_sec
;
264 usec
= stop_tv
->tv_usec
- start_tv
->tv_usec
;
265 if (sec
> 0 && usec
< 0) {
269 ret
= sec
+ usec
/ (double)1000000;
276 * return seconds between start_tv and now in double precision
278 static double time_since_now(struct timeval
*start_tv
)
280 struct timeval stop_time
;
281 gettimeofday(&stop_time
, NULL
);
282 return time_since(start_tv
, &stop_time
);
286 * Add latency info to latency struct
288 static void calc_latency(struct timeval
*start_tv
, struct timeval
*stop_tv
,
289 struct io_latency
*lat
)
293 delta
= time_since(start_tv
, stop_tv
);
294 delta
= delta
* 1000;
296 if (delta
> lat
->max
)
298 if (!lat
->min
|| delta
< lat
->min
)
301 lat
->total_lat
+= delta
;
302 for (i
= 0 ; i
< DEVIATIONS
; i
++) {
303 if (delta
< deviations
[i
]) {
304 lat
->deviations
[i
]++;
310 static void oper_list_add(struct io_oper
*oper
, struct io_oper
**list
)
314 oper
->prev
= oper
->next
= oper
;
317 oper
->prev
= (*list
)->prev
;
319 (*list
)->prev
->next
= oper
;
320 (*list
)->prev
= oper
;
324 static void oper_list_del(struct io_oper
*oper
, struct io_oper
**list
)
326 if ((*list
)->next
== (*list
)->prev
&& *list
== (*list
)->next
) {
330 oper
->prev
->next
= oper
->next
;
331 oper
->next
->prev
= oper
->prev
;
336 /* worker func to check error fields in the io unit */
337 static int check_finished_io(struct io_unit
*io
) {
339 if (io
->res
!= io
->buf_size
) {
342 fstat(io
->io_oper
->fd
, &s
);
345 * If file size is large enough for the read, then this short
348 if ((io
->io_oper
->rw
== READ
|| io
->io_oper
->rw
== RREAD
) &&
349 s
.st_size
> (io
->iocb
.u
.c
.offset
+ io
->res
)) {
351 fprintf(stderr
, "io err %lu (%s) op %d, off %Lu size %d\n",
352 io
->res
, strerror(-io
->res
), io
->iocb
.aio_lio_opcode
,
353 io
->iocb
.u
.c
.offset
, io
->buf_size
);
354 io
->io_oper
->last_err
= io
->res
;
355 io
->io_oper
->num_err
++;
359 if (verify
&& io
->io_oper
->rw
== READ
) {
360 if (memcmp(io
->buf
, verify_buf
, io
->io_oper
->reclen
)) {
361 fprintf(stderr
, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
362 io
->io_oper
->file_name
, io
->iocb
.u
.c
.offset
);
364 for (i
= 0 ; i
< io
->io_oper
->reclen
; i
++) {
365 if (io
->buf
[i
] != verify_buf
[i
]) {
366 fprintf(stderr
, "%d:%c:%c ", i
, io
->buf
[i
], verify_buf
[i
]);
369 fprintf(stderr
, "\n");
376 /* worker func to check the busy bits and get an io unit ready for use */
377 static int grab_iou(struct io_unit
*io
, struct io_oper
*oper
) {
378 if (io
->busy
== IO_PENDING
)
381 io
->busy
= IO_PENDING
;
387 char *stage_name(int rw
) {
394 return "random write";
396 return "random read";
401 static inline double oper_mb_trans(struct io_oper
*oper
) {
402 return ((double)oper
->started_ios
* (double)oper
->reclen
) /
403 (double)(1024 * 1024);
406 static void print_time(struct io_oper
*oper
) {
411 runtime
= time_since_now(&oper
->start_time
);
412 mb
= oper_mb_trans(oper
);
414 fprintf(stderr
, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
415 stage_name(oper
->rw
), oper
->file_name
, tput
, mb
, runtime
);
418 static void print_lat(char *str
, struct io_latency
*lat
) {
419 double avg
= lat
->total_lat
/ lat
->total_io
;
421 double total_counted
= 0;
422 fprintf(stderr
, "%s min %.2f avg %.2f max %.2f\n\t",
423 str
, lat
->min
, avg
, lat
->max
);
425 for (i
= 0 ; i
< DEVIATIONS
; i
++) {
426 fprintf(stderr
, " %.0f < %d", lat
->deviations
[i
], deviations
[i
]);
427 total_counted
+= lat
->deviations
[i
];
429 if (total_counted
&& lat
->total_io
- total_counted
)
430 fprintf(stderr
, " < %.0f", lat
->total_io
- total_counted
);
431 fprintf(stderr
, "\n");
432 memset(lat
, 0, sizeof(*lat
));
435 static void print_latency(struct thread_info
*t
)
437 struct io_latency
*lat
= &t
->io_submit_latency
;
438 print_lat("latency", lat
);
441 static void print_completion_latency(struct thread_info
*t
)
443 struct io_latency
*lat
= &t
->io_completion_latency
;
444 print_lat("completion latency", lat
);
448 * updates the fields in the io operation struct that belongs to this
449 * io unit, and make the io unit reusable again
451 void finish_io(struct thread_info
*t
, struct io_unit
*io
, long result
,
452 struct timeval
*tv_now
) {
453 struct io_oper
*oper
= io
->io_oper
;
455 calc_latency(&io
->io_start_time
, tv_now
, &t
->io_completion_latency
);
458 io
->next
= t
->free_ious
;
461 t
->num_global_pending
--;
462 check_finished_io(io
);
463 if (oper
->num_pending
== 0 &&
464 (oper
->started_ios
== oper
->total_ios
|| oper
->stonewalled
))
470 int read_some_events(struct thread_info
*t
) {
471 struct io_unit
*event_io
;
472 struct io_event
*event
;
475 int min_nr
= io_iter
;
476 struct timeval stop_time
;
478 if (t
->num_global_pending
< io_iter
)
479 min_nr
= t
->num_global_pending
;
482 nr
= io_getevents(t
->io_ctx
, min_nr
, t
->num_global_events
, t
->events
,NULL
);
484 nr
= io_getevents(t
->io_ctx
, t
->num_global_events
, t
->events
, NULL
);
489 gettimeofday(&stop_time
, NULL
);
490 for (i
= 0 ; i
< nr
; i
++) {
491 event
= t
->events
+ i
;
492 event_io
= (struct io_unit
*)((unsigned long)event
->obj
);
493 finish_io(t
, event_io
, event
->res
, &stop_time
);
499 * finds a free io unit, waiting for pending requests if required. returns
500 * null if none could be found
502 static struct io_unit
*find_iou(struct thread_info
*t
, struct io_oper
*oper
)
504 struct io_unit
*event_io
;
509 event_io
= t
->free_ious
;
510 t
->free_ious
= t
->free_ious
->next
;
511 if (grab_iou(event_io
, oper
)) {
512 fprintf(stderr
, "io unit on free list but not free\n");
517 nr
= read_some_events(t
);
521 fprintf(stderr
, "no free ious after read_some_events\n");
526 * wait for all pending requests for this io operation to finish
528 static int io_oper_wait(struct thread_info
*t
, struct io_oper
*oper
) {
529 struct io_event event
;
530 struct io_unit
*event_io
;
536 if (oper
->num_pending
== 0)
539 /* this func is not speed sensitive, no need to go wild reading
540 * more than one event at a time
543 while(io_getevents(t
->io_ctx
, 1, 1, &event
, NULL
) > 0) {
545 while(io_getevents(t
->io_ctx
, 1, &event
, NULL
) > 0) {
547 struct timeval tv_now
;
548 event_io
= (struct io_unit
*)((unsigned long)event
.obj
);
550 gettimeofday(&tv_now
, NULL
);
551 finish_io(t
, event_io
, event
.res
, &tv_now
);
553 if (oper
->num_pending
== 0)
558 fprintf(stderr
, "%u errors on oper, last %u\n",
559 oper
->num_err
, oper
->last_err
);
564 off_t
random_byte_offset(struct io_oper
*oper
) {
566 off_t rand_byte
= oper
->start
;
570 range
= (oper
->end
- oper
->start
) / (1024 * 1024);
571 if ((page_size_mask
+1) > (1024 * 1024))
572 offset
= (page_size_mask
+1) / (1024 * 1024);
578 /* find a random mb offset */
579 num
= 1 + (int)((double)range
* rand() / (RAND_MAX
+ 1.0 ));
580 rand_byte
+= num
* 1024 * 1024;
582 /* find a random byte offset */
583 num
= 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX
+ 1.0));
586 num
= (num
+ page_size_mask
) & ~page_size_mask
;
589 if (rand_byte
+ oper
->reclen
> oper
->end
) {
590 rand_byte
-= oper
->reclen
;
596 * build an aio iocb for an operation, based on oper->rw and the
597 * last offset used. This finds the struct io_unit that will be attached
598 * to the iocb, and things are ready for submission to aio after this
601 * returns null on error
603 static struct io_unit
*build_iocb(struct thread_info
*t
, struct io_oper
*oper
)
608 io
= find_iou(t
, oper
);
610 fprintf(stderr
, "unable to find io unit\n");
616 io_prep_pwrite(&io
->iocb
,oper
->fd
, io
->buf
, oper
->reclen
,
618 oper
->last_offset
+= oper
->reclen
;
621 io_prep_pread(&io
->iocb
,oper
->fd
, io
->buf
, oper
->reclen
,
623 oper
->last_offset
+= oper
->reclen
;
626 rand_byte
= random_byte_offset(oper
);
627 oper
->last_offset
= rand_byte
;
628 io_prep_pread(&io
->iocb
,oper
->fd
, io
->buf
, oper
->reclen
,
632 rand_byte
= random_byte_offset(oper
);
633 oper
->last_offset
= rand_byte
;
634 io_prep_pwrite(&io
->iocb
,oper
->fd
, io
->buf
, oper
->reclen
,
644 * wait for any pending requests, and then free all ram associated with
645 * an operation. returns the last error the operation hit (zero means none)
648 finish_oper(struct thread_info
*t
, struct io_oper
*oper
)
650 unsigned long last_err
;
652 io_oper_wait(t
, oper
);
653 last_err
= oper
->last_err
;
654 if (oper
->num_pending
> 0) {
655 fprintf(stderr
, "oper num_pending is %d\n", oper
->num_pending
);
663 * allocates an io operation and fills in all the fields. returns
666 static struct io_oper
*
667 create_oper(int fd
, int rw
, off_t start
, off_t end
, int reclen
, int depth
,
668 int iter
, char *file_name
)
670 struct io_oper
*oper
;
672 oper
= malloc (sizeof(*oper
));
674 fprintf(stderr
, "unable to allocate io oper\n");
677 memset(oper
, 0, sizeof(*oper
));
682 oper
->last_offset
= oper
->start
;
684 oper
->reclen
= reclen
;
686 oper
->total_ios
= (oper
->end
- oper
->start
) / oper
->reclen
;
687 oper
->file_name
= file_name
;
693 * does setup on num_ios worth of iocbs, but does not actually
696 int build_oper(struct thread_info
*t
, struct io_oper
*oper
, int num_ios
,
697 struct iocb
**my_iocbs
)
702 if (oper
->started_ios
== 0)
703 gettimeofday(&oper
->start_time
, NULL
);
706 num_ios
= oper
->total_ios
;
708 if ((oper
->started_ios
+ num_ios
) > oper
->total_ios
)
709 num_ios
= oper
->total_ios
- oper
->started_ios
;
711 for( i
= 0 ; i
< num_ios
; i
++) {
712 io
= build_iocb(t
, oper
);
716 my_iocbs
[i
] = &io
->iocb
;
722 * runs through the iocbs in the array provided and updates
723 * counters in the associated oper struct
725 static void update_iou_counters(struct iocb
**my_iocbs
, int nr
,
726 struct timeval
*tv_now
)
730 for (i
= 0 ; i
< nr
; i
++) {
731 io
= (struct io_unit
*)(my_iocbs
[i
]);
732 io
->io_oper
->num_pending
++;
733 io
->io_oper
->started_ios
++;
734 io
->io_start_time
= *tv_now
; /* set time of io_submit */
738 /* starts some io for a given file, returns zero if all went well */
739 int run_built(struct thread_info
*t
, int num_ios
, struct iocb
**my_iocbs
)
742 struct timeval start_time
;
743 struct timeval stop_time
;
746 gettimeofday(&start_time
, NULL
);
747 ret
= io_submit(t
->io_ctx
, num_ios
, my_iocbs
);
748 gettimeofday(&stop_time
, NULL
);
749 calc_latency(&start_time
, &stop_time
, &t
->io_submit_latency
);
751 if (ret
!= num_ios
) {
752 /* some ios got through */
754 update_iou_counters(my_iocbs
, ret
, &stop_time
);
756 t
->num_global_pending
+= ret
;
760 * we've used all the requests allocated in aio_init, wait and
763 if (ret
> 0 || ret
== -EAGAIN
) {
765 if ((ret
= read_some_events(t
) > 0)) {
768 fprintf(stderr
, "ret was %d and now is %d\n", ret
, old_ret
);
773 fprintf(stderr
, "ret %d (%s) on io_submit\n", ret
, strerror(-ret
));
776 update_iou_counters(my_iocbs
, ret
, &stop_time
);
777 t
->num_global_pending
+= ret
;
782 * changes oper->rw to the next in a command sequence, or returns zero
783 * to say this operation is really, completely done for
785 static int restart_oper(struct io_oper
*oper
) {
790 /* this switch falls through */
793 if (stages
& (1 << READ
))
796 if (!new_rw
&& stages
& (1 << RWRITE
))
799 if (!new_rw
&& stages
& (1 << RREAD
))
804 oper
->started_ios
= 0;
805 oper
->last_offset
= oper
->start
;
806 oper
->stonewalled
= 0;
809 * we're restarting an operation with pending requests, so the
810 * timing info won't be printed by finish_io. Printing it here
812 if (oper
->num_pending
)
821 static int oper_runnable(struct io_oper
*oper
) {
825 /* first context is always runnable, if started_ios > 0, no need to
826 * redo the calculations
828 if (oper
->started_ios
|| oper
->start
== 0)
831 * only the sequential phases force delays in starting */
832 if (oper
->rw
>= RWRITE
)
834 ret
= fstat(oper
->fd
, &buf
);
839 if (S_ISREG(buf
.st_mode
) && buf
.st_size
< oper
->start
)
845 * runs through all the io operations on the active list, and starts
846 * a chunk of io on each. If any io operations are completely finished,
847 * it either switches them to the next stage or puts them on the
850 * this function stops after max_io_submit iocbs are sent down the
851 * pipe, even if it has not yet touched all the operations on the
852 * active list. Any operations that have finished are moved onto
853 * the finished_opers list.
855 static int run_active_list(struct thread_info
*t
,
859 struct io_oper
*oper
;
860 struct io_oper
*built_opers
= NULL
;
861 struct iocb
**my_iocbs
= t
->iocbs
;
865 oper
= t
->active_opers
;
867 if (!oper_runnable(oper
)) {
869 if (oper
== t
->active_opers
)
873 ret
= build_oper(t
, oper
, io_iter
, my_iocbs
);
877 oper_list_del(oper
, &t
->active_opers
);
878 oper_list_add(oper
, &built_opers
);
879 oper
= t
->active_opers
;
880 if (num_built
+ io_iter
> max_io_submit
)
886 ret
= run_built(t
, num_built
, t
->iocbs
);
888 fprintf(stderr
, "error %d on run_built\n", ret
);
893 oper_list_del(oper
, &built_opers
);
894 oper_list_add(oper
, &t
->active_opers
);
895 if (oper
->started_ios
== oper
->total_ios
) {
896 oper_list_del(oper
, &t
->active_opers
);
897 oper_list_add(oper
, &t
->finished_opers
);
907 if (use_shm
!= USE_SHM
)
910 ret
= shmctl(shm_id
, IPC_RMID
, &ds
);
912 perror("shmctl IPC_RMID");
916 void aio_setup(io_context_t
*io_ctx
, int n
)
918 int res
= io_queue_init(n
, io_ctx
);
920 fprintf(stderr
, "io_queue_setup(%d) returned %d (%s)\n",
921 n
, res
, strerror(-res
));
927 * allocate io operation and event arrays for a given thread
929 int setup_ious(struct thread_info
*t
,
930 int num_files
, int depth
,
931 int reclen
, int max_io_submit
) {
933 size_t bytes
= num_files
* depth
* sizeof(*t
->ios
);
935 t
->ios
= malloc(bytes
);
937 fprintf(stderr
, "unable to allocate io units\n");
940 memset(t
->ios
, 0, bytes
);
942 for (i
= 0 ; i
< depth
* num_files
; i
++) {
943 t
->ios
[i
].buf
= aligned_buffer
;
944 aligned_buffer
+= padded_reclen
;
945 t
->ios
[i
].buf_size
= reclen
;
947 memset(t
->ios
[i
].buf
, 'b', reclen
);
949 memset(t
->ios
[i
].buf
, 0, reclen
);
950 t
->ios
[i
].next
= t
->free_ious
;
951 t
->free_ious
= t
->ios
+ i
;
954 verify_buf
= aligned_buffer
;
955 memset(verify_buf
, 'b', reclen
);
958 t
->iocbs
= malloc(sizeof(struct iocb
*) * max_io_submit
);
960 fprintf(stderr
, "unable to allocate iocbs\n");
964 memset(t
->iocbs
, 0, max_io_submit
* sizeof(struct iocb
*));
966 t
->events
= malloc(sizeof(struct io_event
) * depth
* num_files
);
968 fprintf(stderr
, "unable to allocate ram for events\n");
971 memset(t
->events
, 0, num_files
* sizeof(struct io_event
)*depth
);
973 t
->num_global_ios
= num_files
* depth
;
974 t
->num_global_events
= t
->num_global_ios
;
988 * The buffers used for file data are allocated as a single big
989 * malloc, and then each thread and operation takes a piece and uses
990 * that for file data. This lets us do a large shm or bigpages alloc
991 * and without trying to find a special place in each thread to map the
994 int setup_shared_mem(int num_threads
, int num_files
, int depth
,
995 int reclen
, int max_io_submit
)
1000 padded_reclen
= (reclen
+ page_size_mask
) / (page_size_mask
+1);
1001 padded_reclen
= padded_reclen
* (page_size_mask
+1);
1002 total_ram
= num_files
* depth
* padded_reclen
+ num_threads
;
1004 total_ram
+= padded_reclen
;
1006 if (use_shm
== USE_MALLOC
) {
1007 p
= malloc(total_ram
+ page_size_mask
);
1008 } else if (use_shm
== USE_SHM
) {
1009 shm_id
= shmget(IPC_PRIVATE
, total_ram
, IPC_CREAT
| 0700);
1015 p
= shmat(shm_id
, (char *)0x50000000, 0);
1016 if ((long)p
== -1) {
1020 /* won't really be dropped until we shmdt */
1022 } else if (use_shm
== USE_SHMFS
) {
1023 char mmap_name
[16]; /* /dev/shm/ + null + XXXXXX */
1026 strcpy(mmap_name
, "/dev/shm/XXXXXX");
1027 fd
= mkstemp(mmap_name
);
1033 ftruncate(fd
, total_ram
);
1035 p
= mmap((char *)0x50000000, total_ram
,
1036 PROT_READ
| PROT_WRITE
, MAP_SHARED
, fd
, 0);
1038 if (p
== MAP_FAILED
) {
1044 fprintf(stderr
, "unable to allocate buffers\n");
1047 unaligned_buffer
= p
;
1048 p
= (char*)((intptr_t) (p
+ page_size_mask
) & ~page_size_mask
);
1054 if (unaligned_buffer
)
1055 free(unaligned_buffer
);
1060 * runs through all the thread_info structs and calculates a combined
1063 void global_thread_throughput(struct thread_info
*t
, char *this_stage
) {
1065 double runtime
= time_since_now(&global_stage_start_time
);
1066 double total_mb
= 0;
1067 double min_trans
= 0;
1069 for (i
= 0 ; i
< num_threads
; i
++) {
1070 total_mb
+= global_thread_info
[i
].stage_mb_trans
;
1071 if (!min_trans
|| t
->stage_mb_trans
< min_trans
)
1072 min_trans
= t
->stage_mb_trans
;
1075 fprintf(stderr
, "%s throughput (%.2f MB/s) ", this_stage
,
1076 total_mb
/ runtime
);
1077 fprintf(stderr
, "%.2f MB in %.2fs", total_mb
, runtime
);
1079 fprintf(stderr
, " min transfer %.2fMB", min_trans
);
1080 fprintf(stderr
, "\n");
1085 /* this is the meat of the state machine. There is a list of
1086 * active operations structs, and as each one finishes the required
1087 * io it is moved to a list of finished operations. Once they have
1088 * all finished whatever stage they were in, they are given the chance
1089 * to restart and pick a different stage (read/write/random read etc)
1091 * various timings are printed in between the stages, along with
1092 * thread synchronization if there are more than one threads.
1094 int worker(struct thread_info
*t
)
1096 struct io_oper
*oper
;
1097 char *this_stage
= NULL
;
1098 struct timeval stage_time
;
1103 aio_setup(&t
->io_ctx
, 512);
1106 if (num_threads
> 1) {
1107 pthread_mutex_lock(&stage_mutex
);
1109 if (threads_starting
== num_threads
) {
1111 gettimeofday(&global_stage_start_time
, NULL
);
1112 pthread_cond_broadcast(&stage_cond
);
1114 while (threads_starting
!= num_threads
)
1115 pthread_cond_wait(&stage_cond
, &stage_mutex
);
1116 pthread_mutex_unlock(&stage_mutex
);
1118 if (t
->active_opers
) {
1119 this_stage
= stage_name(t
->active_opers
->rw
);
1120 gettimeofday(&stage_time
, NULL
);
1121 t
->stage_mb_trans
= 0;
1125 /* first we send everything through aio */
1126 while(t
->active_opers
&& (cnt
< iterations
|| iterations
== RUN_FOREVER
)) {
1127 if (stonewall
&& threads_ending
) {
1128 oper
= t
->active_opers
;
1129 oper
->stonewalled
= 1;
1130 oper_list_del(oper
, &t
->active_opers
);
1131 oper_list_add(oper
, &t
->finished_opers
);
1133 run_active_list(t
, io_iter
, max_io_submit
);
1140 if (completion_latency_stats
)
1141 print_completion_latency(t
);
1143 /* then we wait for all the operations to finish */
1144 oper
= t
->finished_opers
;
1148 io_oper_wait(t
, oper
);
1150 } while(oper
!= t
->finished_opers
);
1152 /* then we do an fsync to get the timing for any future operations
1153 * right, and check to see if any of these need to get restarted
1155 oper
= t
->finished_opers
;
1159 t
->stage_mb_trans
+= oper_mb_trans(oper
);
1160 if (restart_oper(oper
)) {
1161 oper_list_del(oper
, &t
->finished_opers
);
1162 oper_list_add(oper
, &t
->active_opers
);
1163 oper
= t
->finished_opers
;
1167 if (oper
== t
->finished_opers
)
1171 if (t
->stage_mb_trans
&& t
->num_files
> 0) {
1172 double seconds
= time_since_now(&stage_time
);
1173 fprintf(stderr
, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1174 t
- global_thread_info
, this_stage
, t
->stage_mb_trans
/seconds
,
1175 t
->stage_mb_trans
, seconds
);
1178 if (num_threads
> 1) {
1179 pthread_mutex_lock(&stage_mutex
);
1181 if (threads_ending
== num_threads
) {
1182 threads_starting
= 0;
1183 pthread_cond_broadcast(&stage_cond
);
1184 global_thread_throughput(t
, this_stage
);
1186 while(threads_ending
!= num_threads
)
1187 pthread_cond_wait(&stage_cond
, &stage_mutex
);
1188 pthread_mutex_unlock(&stage_mutex
);
1191 /* someone got restarted, go back to the beginning */
1192 if (t
->active_opers
&& (cnt
< iterations
|| iterations
== RUN_FOREVER
)) {
1197 /* finally, free all the ram */
1198 while(t
->finished_opers
) {
1199 oper
= t
->finished_opers
;
1200 oper_list_del(oper
, &t
->finished_opers
);
1201 status
= finish_oper(t
, oper
);
1204 if (t
->num_global_pending
) {
1205 fprintf(stderr
, "global num pending is %d\n", t
->num_global_pending
);
1207 io_queue_release(t
->io_ctx
);
1212 typedef void * (*start_routine
)(void *);
1213 int run_workers(struct thread_info
*t
, int num_threads
)
1219 for(i
= 0 ; i
< num_threads
; i
++) {
1220 ret
= pthread_create(&t
[i
].tid
, NULL
, (start_routine
)worker
, t
+ i
);
1222 perror("pthread_create");
1226 for(i
= 0 ; i
< num_threads
; i
++) {
1227 ret
= pthread_join(t
[i
].tid
, (void *)&thread_ret
);
1229 perror("pthread_join");
1236 off_t
parse_size(char *size_arg
, off_t mult
) {
1240 c
= size_arg
[strlen(size_arg
) - 1];
1242 size_arg
[strlen(size_arg
) - 1] = '\0';
1244 num
= atoi(size_arg
);
1248 mult
= 1024 * 1024 * 1024;
1267 void print_usage(void) {
1268 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1269 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1270 printf(" file1 [file2 ...]\n");
1271 printf("\t-a size in KB at which to align buffers\n");
1272 printf("\t-b max number of iocbs to give io_submit at once\n");
1273 printf("\t-c number of io contexts per file\n");
1274 printf("\t-C offset between contexts, default 2MB\n");
1275 printf("\t-s size in MB of the test file(s), default 1024MB\n");
1276 printf("\t-r record size in KB used for each io, default 64KB\n");
1277 printf("\t-d number of pending aio requests for each file, default 64\n");
1278 printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n");
1279 printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1280 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1281 printf("\t-S Use O_SYNC for writes\n");
1282 printf("\t-o add an operation to the list: write=0, read=1,\n");
1283 printf("\t random write=2, random read=3.\n");
1284 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1285 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1286 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1287 printf("\t-n no fsyncs between write stage and read stage\n");
1288 printf("\t-l print io_submit latencies after each stage\n");
1289 printf("\t-L print io completion latencies after each stage\n");
1290 printf("\t-t number of threads to run\n");
1291 printf("\t-u unlink files after completion\n");
1292 printf("\t-v verification of bytes written\n");
1293 printf("\t-x turn off thread stonewalling\n");
1294 printf("\t-h this message\n");
1295 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1296 printf("\t translate to 400KB, 400MB and 400GB\n");
1297 printf("version %s\n", PROG_VERSION
);
1300 int main(int ac
, char **av
)
1307 off_t file_size
= 1 * 1024 * 1024 * 1024;
1308 int first_stage
= WRITE
;
1309 struct io_oper
*oper
;
1313 struct thread_info
*t
;
1315 page_size_mask
= getpagesize() - 1;
1318 c
= getopt(ac
, av
, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1324 page_size_mask
= parse_size(optarg
, 1024);
1328 num_contexts
= atoi(optarg
);
1331 context_offset
= parse_size(optarg
, 1024 * 1024);
1333 max_io_submit
= atoi(optarg
);
1336 file_size
= parse_size(optarg
, 1024 * 1024);
1339 depth
= atoi(optarg
);
1342 rec_len
= parse_size(optarg
, 1024);
1345 io_iter
= atoi(optarg
);
1348 iterations
= atoi(optarg
);
1357 completion_latency_stats
= 1;
1360 if (!strcmp(optarg
, "shm")) {
1361 fprintf(stderr
, "using ipc shm\n");
1363 } else if (!strcmp(optarg
, "shmfs")) {
1364 fprintf(stderr
, "using /dev/shm for buffers\n");
1365 use_shm
= USE_SHMFS
;
1371 fprintf(stderr
, "adding stage %s\n", stage_name(i
));
1374 o_direct
= O_DIRECT
;
1380 num_threads
= atoi(optarg
);
1399 * make sure we don't try to submit more ios than we have allocated
1402 if (depth
< io_iter
) {
1404 fprintf(stderr
, "dropping io_iter to %d\n", io_iter
);
1412 num_files
= ac
- optind
;
1414 if (num_threads
> (num_files
* num_contexts
)) {
1415 num_threads
= num_files
* num_contexts
;
1416 fprintf(stderr
, "dropping thread count to the number of contexts %d\n",
1420 t
= malloc(num_threads
* sizeof(*t
));
1425 global_thread_info
= t
;
1427 /* by default, allow a huge number of iocbs to be sent towards
1431 max_io_submit
= num_files
* io_iter
* num_contexts
;
1434 * make sure we don't try to submit more ios than max_io_submit allows
1436 if (max_io_submit
< io_iter
) {
1437 io_iter
= max_io_submit
;
1438 fprintf(stderr
, "dropping io_iter to %d\n", io_iter
);
1442 stages
= (1 << WRITE
) | (1 << READ
) | (1 << RREAD
) | (1 << RWRITE
);
1444 for (i
= 0 ; i
< LAST_STAGE
; i
++) {
1445 if (stages
& (1 << i
)) {
1447 fprintf(stderr
, "starting with %s\n", stage_name(i
));
1453 if (file_size
< num_contexts
* context_offset
) {
1454 fprintf(stderr
, "file size %Lu too small for %d contexts\n",
1455 file_size
, num_contexts
);
1459 fprintf(stderr
, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size
/ (1024 * 1024), rec_len
/ 1024, depth
, io_iter
);
1460 fprintf(stderr
, "max io_submit %d, buffer alignment set to %luKB\n",
1461 max_io_submit
, (page_size_mask
+ 1)/1024);
1462 fprintf(stderr
, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
1463 num_threads
, num_files
, num_contexts
,
1464 context_offset
/ (1024 * 1024), verify
? "on" : "off");
1465 /* open all the files and do any required setup for them */
1466 for (i
= optind
; i
< ac
; i
++) {
1468 for (j
= 0 ; j
< num_contexts
; j
++) {
1469 thread_index
= open_fds
% num_threads
;
1472 rwfd
= open(av
[i
], O_CREAT
| O_RDWR
| o_direct
| o_sync
, 0600);
1475 oper
= create_oper(rwfd
, first_stage
, j
* context_offset
,
1476 file_size
- j
* context_offset
, rec_len
,
1477 depth
, io_iter
, av
[i
]);
1479 fprintf(stderr
, "error in create_oper\n");
1482 oper_list_add(oper
, &t
[thread_index
].active_opers
);
1483 t
[thread_index
].num_files
++;
1486 if (setup_shared_mem(num_threads
, num_files
* num_contexts
,
1487 depth
, rec_len
, max_io_submit
))
1491 for (i
= 0 ; i
< num_threads
; i
++) {
1492 if (setup_ious(&t
[i
], t
[i
].num_files
, depth
, rec_len
, max_io_submit
))
1495 if (num_threads
> 1){
1496 printf("Running multi thread version num_threads:%d\n", num_threads
);
1497 run_workers(t
, num_threads
);
1499 printf("Running single thread version \n");
1503 for (i
= optind
; i
< ac
; i
++) {
1504 printf("Cleaning up file %s \n", av
[i
]);