2 * Test client to test the NBD server. Doesn't do anything useful, except
3 * checking that the server does, actually, work.
5 * Note that the only 'real' test is to check the client against a kernel. If
6 * it works here but does not work in the kernel, then that's most likely a bug
7 * in this program and/or in nbd-server.
9 * Copyright(c) 2006 Wouter Verhelst
11 * This program is Free Software; you can redistribute it and/or modify it
12 * under the terms of the GNU General Public License as published by the Free
13 * Software Foundation, in version 2.
15 * This program is distributed in the hope that it will be useful, but WITHOUT
16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
20 * You should have received a copy of the GNU General Public License along with
21 * this program; if not, write to the Free Software Foundation, Inc., 51
22 * Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
29 #include <sys/types.h>
30 #include <sys/socket.h>
38 #include <netinet/in.h>
41 #define MY_NAME "nbd-tester-client"
44 static gchar errstr
[1024];
45 const static int errstr_len
=1024;
49 static int looseordering
= 0;
51 static gchar
* transactionlog
= "nbd-tester-client.tr";
55 CONNECTION_TYPE_CONNECT
,
56 CONNECTION_TYPE_INIT_PASSWD
,
57 CONNECTION_TYPE_CLISERV
,
62 CONNECTION_CLOSE_PROPERLY
,
63 CONNECTION_CLOSE_FAST
,
69 struct nbd_request req
;
70 struct reqcontext
* next
;
71 struct reqcontext
* prev
;
75 struct reqcontext
* head
;
76 struct reqcontext
* tail
;
102 void rclist_unlink(struct rclist
* l
, struct reqcontext
* p
) {
104 struct reqcontext
* prev
= p
->prev
;
105 struct reqcontext
* next
= p
->next
;
107 /* Fix link to previous */
124 /* Add a new list item to the tail */
125 void rclist_addtail(struct rclist
* l
, struct reqcontext
* p
) {
130 g_warning("addtail found list tail has a next pointer");
137 g_warning("addtail found no list tail but a list head");
146 void chunklist_unlink(struct chunklist
* l
, struct chunk
* p
) {
148 struct chunk
* prev
= p
->prev
;
149 struct chunk
* next
= p
->next
;
151 /* Fix link to previous */
168 /* Add a new list item to the tail */
169 void chunklist_addtail(struct chunklist
* l
, struct chunk
* p
) {
174 g_warning("addtail found list tail has a next pointer");
181 g_warning("addtail found no list tail but a list head");
190 /* Add some new bytes to a chunklist */
191 void addbuffer(struct chunklist
* l
, void * data
, uint64_t len
) {
193 uint64_t size
= 64*1024;
194 struct chunk
* pchunk
;
198 /* First see if there is a current chunk, and if it has space */
199 if (l
->tail
&& l
->tail
->space
) {
200 uint64_t towrite
= len
;
201 if (towrite
> l
->tail
->space
)
202 towrite
= l
->tail
->space
;
203 memcpy(l
->tail
->writeptr
, data
, towrite
);
204 l
->tail
->length
+= towrite
;
205 l
->tail
->space
-= towrite
;
206 l
->tail
->writeptr
+= towrite
;
212 /* We still need to write more, so prepare a new chunk */
213 if ((NULL
== (buf
= malloc(size
))) || (NULL
== (pchunk
= calloc(1, sizeof(struct chunk
))))) {
214 g_critical("Out of memory");
218 pchunk
->buffer
= buf
;
219 pchunk
->readptr
= buf
;
220 pchunk
->writeptr
= buf
;
221 pchunk
->space
= size
;
222 chunklist_addtail(l
, pchunk
);
228 /* returns 0 on success, -1 on failure */
229 int writebuffer(int fd
, struct chunklist
* l
) {
231 struct chunk
* pchunk
= NULL
;
241 if (!(pchunk
->length
) || !(pchunk
->readptr
)) {
242 chunklist_unlink(l
, pchunk
);
243 free(pchunk
->buffer
);
249 /* OK we have a chunk with some data in */
250 res
= write(fd
, pchunk
->readptr
, pchunk
->length
);
255 pchunk
->length
-= res
;
256 pchunk
->readptr
+= res
;
257 if (!pchunk
->length
) {
258 chunklist_unlink(l
, pchunk
);
259 free(pchunk
->buffer
);
267 #define TEST_WRITE (1<<0)
268 #define TEST_FLUSH (1<<1)
270 int timeval_subtract (struct timeval
*result
, struct timeval
*x
,
272 if (x
->tv_usec
< y
->tv_usec
) {
273 int nsec
= (y
->tv_usec
- x
->tv_usec
) / 1000000 + 1;
274 y
->tv_usec
-= 1000000 * nsec
;
278 if (x
->tv_usec
- y
->tv_usec
> 1000000) {
279 int nsec
= (x
->tv_usec
- y
->tv_usec
) / 1000000;
280 y
->tv_usec
+= 1000000 * nsec
;
284 result
->tv_sec
= x
->tv_sec
- y
->tv_sec
;
285 result
->tv_usec
= x
->tv_usec
- y
->tv_usec
;
287 return x
->tv_sec
< y
->tv_sec
;
290 double timeval_diff_to_double (struct timeval
* x
, struct timeval
* y
) {
292 timeval_subtract(&r
, x
, y
);
293 return r
.tv_sec
* 1.0 + r
.tv_usec
/1000000.0;
296 static inline int read_all(int f
, void *buf
, size_t len
) {
301 if((res
=read(f
, buf
, len
)) <=0) {
304 snprintf(errstr
, errstr_len
, "Read failed: %s", strerror(errno
));
314 static inline int write_all(int f
, void *buf
, size_t len
) {
319 if((res
=write(f
, buf
, len
)) <=0) {
322 snprintf(errstr
, errstr_len
, "Write failed: %s", strerror(errno
));
332 #define READ_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((read_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; }
333 #define READ_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((read_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; }
335 #define WRITE_ALL_ERRCHK(f, buf, len, whereto, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); goto whereto; }
336 #define WRITE_ALL_ERR_RT(f, buf, len, whereto, rval, errmsg...) if((write_all(f, buf, len))<=0) { snprintf(errstr, errstr_len, ##errmsg); retval = rval; goto whereto; }
338 int setup_connection(gchar
*hostname
, int port
, gchar
* name
, CONNECTION_TYPE ctype
, int* serverflags
) {
340 struct hostent
*host
;
341 struct sockaddr_in addr
;
343 uint64_t mymagic
= (name
? opts_magic
: cliserv_magic
);
348 if(ctype
<CONNECTION_TYPE_CONNECT
)
350 if((sock
=socket(PF_INET
, SOCK_STREAM
, IPPROTO_TCP
))<0) {
351 strncpy(errstr
, strerror(errno
), errstr_len
);
355 if(!(host
=gethostbyname(hostname
))) {
356 strncpy(errstr
, hstrerror(h_errno
), errstr_len
);
359 addr
.sin_family
=AF_INET
;
360 addr
.sin_port
=htons(port
);
361 addr
.sin_addr
.s_addr
=*((int *) host
->h_addr
);
362 if((connect(sock
, (struct sockaddr
*)&addr
, sizeof(addr
))<0)) {
363 strncpy(errstr
, strerror(errno
), errstr_len
);
366 if(ctype
<CONNECTION_TYPE_INIT_PASSWD
)
368 READ_ALL_ERRCHK(sock
, buf
, strlen(INIT_PASSWD
), err_open
, "Could not read INIT_PASSWD: %s", strerror(errno
));
370 snprintf(errstr
, errstr_len
, "Server closed connection");
373 if(strncmp(buf
, INIT_PASSWD
, strlen(INIT_PASSWD
))) {
374 snprintf(errstr
, errstr_len
, "INIT_PASSWD does not match");
377 if(ctype
<CONNECTION_TYPE_CLISERV
)
379 READ_ALL_ERRCHK(sock
, &tmp64
, sizeof(tmp64
), err_open
, "Could not read cliserv_magic: %s", strerror(errno
));
381 if(tmp64
!= mymagic
) {
382 strncpy(errstr
, "mymagic does not match", errstr_len
);
385 if(ctype
<CONNECTION_TYPE_FULL
)
388 READ_ALL_ERRCHK(sock
, &size
, sizeof(size
), err_open
, "Could not read size: %s", strerror(errno
));
390 READ_ALL_ERRCHK(sock
, buf
, 128, err_open
, "Could not read data: %s", strerror(errno
));
394 READ_ALL_ERRCHK(sock
, buf
, sizeof(uint16_t), err_open
, "Could not read reserved field: %s", strerror(errno
));
396 WRITE_ALL_ERRCHK(sock
, &tmp32
, sizeof(tmp32
), err_open
, "Could not write reserved field: %s", strerror(errno
));
398 tmp64
= htonll(opts_magic
);
399 WRITE_ALL_ERRCHK(sock
, &tmp64
, sizeof(tmp64
), err_open
, "Could not write magic: %s", strerror(errno
));
401 tmp32
= htonl(NBD_OPT_EXPORT_NAME
);
402 WRITE_ALL_ERRCHK(sock
, &tmp32
, sizeof(tmp32
), err_open
, "Could not write option: %s", strerror(errno
));
403 tmp32
= htonl((uint32_t)strlen(name
));
404 WRITE_ALL_ERRCHK(sock
, &tmp32
, sizeof(tmp32
), err_open
, "Could not write name length: %s", strerror(errno
));
405 WRITE_ALL_ERRCHK(sock
, name
, strlen(name
), err_open
, "Could not write name:: %s", strerror(errno
));
406 READ_ALL_ERRCHK(sock
, &size
, sizeof(size
), err_open
, "Could not read size: %s", strerror(errno
));
409 READ_ALL_ERRCHK(sock
, &flags
, sizeof(uint16_t), err_open
, "Could not read flags: %s", strerror(errno
));
410 flags
= ntohs(flags
);
411 *serverflags
= flags
;
412 READ_ALL_ERRCHK(sock
, buf
, 124, err_open
, "Could not read reserved zeroes: %s", strerror(errno
));
422 int close_connection(int sock
, CLOSE_TYPE type
) {
423 struct nbd_request req
;
427 case CONNECTION_CLOSE_PROPERLY
:
428 req
.magic
=htonl(NBD_REQUEST_MAGIC
);
429 req
.type
=htonl(NBD_CMD_DISC
);
430 memcpy(&(req
.handle
), &(counter
), sizeof(counter
));
434 if(write(sock
, &req
, sizeof(req
))<0) {
435 snprintf(errstr
, errstr_len
, "Could not write to socket: %s", strerror(errno
));
438 case CONNECTION_CLOSE_FAST
:
440 snprintf(errstr
, errstr_len
, "Could not close socket: %s", strerror(errno
));
445 g_critical("Your compiler is on crack!"); /* or I am buggy */
451 int read_packet_check_header(int sock
, size_t datasize
, long long int curhandle
) {
452 struct nbd_reply rep
;
456 READ_ALL_ERR_RT(sock
, &rep
, sizeof(rep
), end
, -1, "Could not read reply header: %s", strerror(errno
));
457 rep
.magic
=ntohl(rep
.magic
);
458 rep
.error
=ntohl(rep
.error
);
459 if(rep
.magic
!=NBD_REPLY_MAGIC
) {
460 snprintf(errstr
, errstr_len
, "Received package with incorrect reply_magic. Index of sent packages is %lld (0x%llX), received handle is %lld (0x%llX). Received magic 0x%lX, expected 0x%lX", (long long int)curhandle
, (long long unsigned int)curhandle
, (long long int)*((u64
*)rep
.handle
), (long long unsigned int)*((u64
*)rep
.handle
), (long unsigned int)rep
.magic
, (long unsigned int)NBD_REPLY_MAGIC
);
465 snprintf(errstr
, errstr_len
, "Received error from server: %ld (0x%lX). Handle is %lld (0x%llX).", (long int)rep
.error
, (long unsigned int)rep
.error
, (long long int)(*((u64
*)rep
.handle
)), (long long unsigned int)*((u64
*)rep
.handle
));
470 READ_ALL_ERR_RT(sock
, &buf
, datasize
, end
, -1, "Could not read data: %s", strerror(errno
));
476 int oversize_test(gchar
* hostname
, int port
, char* name
, int sock
,
477 char sock_is_open
, char close_sock
, int testflags
) {
479 struct nbd_request req
;
480 struct nbd_reply rep
;
483 pid_t G_GNUC_UNUSED mypid
= getpid();
484 char buf
[((1024*1024)+sizeof(struct nbd_request
)/2)<<1];
487 /* This should work */
489 if((sock
=setup_connection(hostname
, port
, name
, CONNECTION_TYPE_FULL
, &serverflags
))<0) {
490 g_warning("Could not open socket: %s", errstr
);
495 req
.magic
=htonl(NBD_REQUEST_MAGIC
);
496 req
.type
=htonl(NBD_CMD_READ
);
497 req
.len
=htonl(1024*1024);
498 memcpy(&(req
.handle
),&i
,sizeof(i
));
500 WRITE_ALL_ERR_RT(sock
, &req
, sizeof(req
), err
, -1, "Could not write request: %s", strerror(errno
));
501 printf("%d: testing oversized request: %d: ", getpid(), ntohl(req
.len
));
502 READ_ALL_ERR_RT(sock
, &rep
, sizeof(struct nbd_reply
), err
, -1, "Could not read reply header: %s", strerror(errno
));
503 READ_ALL_ERR_RT(sock
, &buf
, ntohl(req
.len
), err
, -1, "Could not read data: %s", strerror(errno
));
505 snprintf(errstr
, errstr_len
, "Received unexpected error: %d", rep
.error
);
511 /* This probably should not work */
512 i
++; req
.from
=htonll(i
);
513 req
.len
= htonl(ntohl(req
.len
) + sizeof(struct nbd_request
) / 2);
514 WRITE_ALL_ERR_RT(sock
, &req
, sizeof(req
), err
, -1, "Could not write request: %s", strerror(errno
));
515 printf("%d: testing oversized request: %d: ", getpid(), ntohl(req
.len
));
516 READ_ALL_ERR_RT(sock
, &rep
, sizeof(struct nbd_reply
), err
, -1, "Could not read reply header: %s", strerror(errno
));
517 READ_ALL_ERR_RT(sock
, &buf
, ntohl(req
.len
), err
, -1, "Could not read data: %s", strerror(errno
));
519 printf("Received expected error\n");
525 /* ... unless this works, too */
526 i
++; req
.from
=htonll(i
);
527 req
.len
= htonl(ntohl(req
.len
) << 1);
528 WRITE_ALL_ERR_RT(sock
, &req
, sizeof(req
), err
, -1, "Could not write request: %s", strerror(errno
));
529 printf("%d: testing oversized request: %d: ", getpid(), ntohl(req
.len
));
530 READ_ALL_ERR_RT(sock
, &rep
, sizeof(struct nbd_reply
), err
, -1, "Could not read reply header: %s", strerror(errno
));
531 READ_ALL_ERR_RT(sock
, &buf
, ntohl(req
.len
), err
, -1, "Could not read data: %s", strerror(errno
));
537 if((rep
.error
&& !got_err
) || (!rep
.error
&& got_err
)) {
538 printf("Received unexpected error\n");
545 int throughput_test(gchar
* hostname
, int port
, char* name
, int sock
,
546 char sock_is_open
, char close_sock
, int testflags
) {
549 struct nbd_request req
;
553 struct timeval start
;
557 char speedchar
[2] = { '\0', '\0' };
560 signed int do_write
=TRUE
;
561 pid_t mypid
= getpid();
564 if (!(testflags
& TEST_WRITE
))
565 testflags
&= ~TEST_FLUSH
;
567 memset (writebuf
, 'X', 1024);
570 if((sock
=setup_connection(hostname
, port
, name
, CONNECTION_TYPE_FULL
, &serverflags
))<0) {
571 g_warning("Could not open socket: %s", errstr
);
576 if ((testflags
& TEST_FLUSH
) && ((serverflags
& (NBD_FLAG_SEND_FLUSH
| NBD_FLAG_SEND_FUA
))
577 != (NBD_FLAG_SEND_FLUSH
| NBD_FLAG_SEND_FUA
))) {
578 snprintf(errstr
, errstr_len
, "Server did not supply flush capability flags");
582 req
.magic
=htonl(NBD_REQUEST_MAGIC
);
584 if(gettimeofday(&start
, NULL
)<0) {
586 snprintf(errstr
, errstr_len
, "Could not measure start time: %s", strerror(errno
));
589 for(i
=0;i
+1024<=size
;i
+=1024) {
591 int sendfua
= (testflags
& TEST_FLUSH
) && (((i
>>10) & 15) == 3);
592 int sendflush
= (testflags
& TEST_FLUSH
) && (((i
>>10) & 15) == 11);
593 req
.type
=htonl((testflags
& TEST_WRITE
)?NBD_CMD_WRITE
:NBD_CMD_READ
);
595 req
.type
= htonl(NBD_CMD_WRITE
| NBD_CMD_FLAG_FUA
);
596 memcpy(&(req
.handle
),&i
,sizeof(i
));
598 if (write_all(sock
, &req
, sizeof(req
)) <0) {
602 if (testflags
& TEST_WRITE
) {
603 if (write_all(sock
, writebuf
, 1024) <0) {
608 printf("%d: Requests(+): %d\r", (int)mypid
, ++requests
);
610 long long int j
= i
^ (1LL<<63);
611 req
.type
= htonl(NBD_CMD_FLUSH
);
612 memcpy(&(req
.handle
),&j
,sizeof(j
));
614 if (write_all(sock
, &req
, sizeof(req
)) <0) {
618 printf("%d: Requests(+): %d\r", (int)mypid
, ++requests
);
626 select(sock
+1, &set
, NULL
, NULL
, &tv
);
627 if(FD_ISSET(sock
, &set
)) {
628 /* Okay, there's something ready for
630 if(read_packet_check_header(sock
, (testflags
& TEST_WRITE
)?0:1024, i
)<0) {
634 printf("%d: Requests(-): %d\r", (int)mypid
, --requests
);
636 } while FD_ISSET(sock
, &set
);
637 /* Now wait until we can write again or until a second have
638 * passed, whichever comes first*/
643 do_write
=select(sock
+1,NULL
,&set
,NULL
,&tv
);
644 if(!do_write
) printf("Select finished\n");
646 snprintf(errstr
, errstr_len
, "select: %s", strerror(errno
));
651 /* Now empty the read buffer */
657 select(sock
+1, &set
, NULL
, NULL
, &tv
);
658 if(FD_ISSET(sock
, &set
)) {
659 /* Okay, there's something ready for
661 read_packet_check_header(sock
, (testflags
& TEST_WRITE
)?0:1024, i
);
662 printf("%d: Requests(-): %d\r", (int)mypid
, --requests
);
666 if(gettimeofday(&stop
, NULL
)<0) {
668 snprintf(errstr
, errstr_len
, "Could not measure end time: %s", strerror(errno
));
671 timespan
=timeval_diff_to_double(&stop
, &start
);
685 g_message("%d: Throughput %s test (%s flushes) complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags
& TEST_WRITE
)?"write":"read", (testflags
& TEST_FLUSH
)?"with":"without", timespan
, speed
, speedchar
);
689 close_connection(sock
, CONNECTION_CLOSE_PROPERLY
);
696 * fill 512 byte buffer 'buf' with a hashed selection of interesting data based
697 * only on handle and blknum. The first word is blknum, and the second handle, for ease
698 * of understanding. Things with handle 0 are blank.
700 static inline void makebuf(char *buf
, uint64_t seq
, uint64_t blknum
) {
701 uint64_t x
= ((uint64_t)blknum
) ^ (seq
<< 32) ^ (seq
>> 32);
702 uint64_t* p
= (uint64_t*)buf
;
708 for (i
= 0; i
<512/sizeof(uint64_t); i
++) {
711 x
+=0xFEEDA1ECDEADBEEFULL
+i
+(((uint64_t)i
)<<56);
713 x
= x
^ (x
<<s
) ^ (x
>>(64-s
)) ^ 0xAA55AA55AA55AA55ULL
^ seq
;
717 static inline int checkbuf(char *buf
, uint64_t seq
, uint64_t blknum
) {
718 uint64_t cmp
[64]; // 512/8 = 64
719 makebuf((char *)cmp
, seq
, blknum
);
720 return memcmp(cmp
, buf
, 512)?-1:0;
723 static inline void dumpcommand(char * text
, uint32_t command
)
725 #ifdef DEBUG_COMMANDS
726 command
=ntohl(command
);
728 switch (command
& NBD_CMD_MASK_COMMAND
) {
730 ctext
="NBD_CMD_READ";
733 ctext
="NBD_CMD_WRITE";
736 ctext
="NBD_CMD_DISC";
739 ctext
="NBD_CMD_FLUSH";
745 printf("%s: %s [%s] (0x%08x)\n",
748 (command
& NBD_CMD_FLAG_FUA
)?"FUA":"NONE",
753 /* return an unused handle */
754 uint64_t getrandomhandle(GHashTable
*phash
) {
758 /* RAND_MAX may be as low as 2^15 */
759 for (i
= 1 ; i
<=5; i
++)
760 handle
^= random() ^ (handle
<< 15);
761 } while (g_hash_table_lookup(phash
, &handle
));
765 int integrity_test(gchar
* hostname
, int port
, char* name
, int sock
,
766 char sock_is_open
, char close_sock
, int testflags
) {
767 struct nbd_reply rep
;
771 struct timeval start
;
775 char speedchar
[2] = { '\0', '\0' };
778 pid_t G_GNUC_UNUSED mypid
= getpid();
780 char *blkhashname
=NULL
;
781 struct blkitem
*blkhash
= NULL
;
784 uint64_t processed
=0;
787 int readtransactionfile
= 1;
789 struct rclist txqueue
={NULL
, NULL
, 0};
790 struct rclist inflight
={NULL
, NULL
, 0};
791 struct chunklist txbuf
={NULL
, NULL
, 0};
793 GHashTable
*handlehash
= g_hash_table_new(g_int64_hash
, g_int64_equal
);
797 if((sock
=setup_connection(hostname
, port
, name
, CONNECTION_TYPE_FULL
, &serverflags
))<0) {
798 g_warning("Could not open socket: %s", errstr
);
804 if ((serverflags
& (NBD_FLAG_SEND_FLUSH
| NBD_FLAG_SEND_FUA
))
805 != (NBD_FLAG_SEND_FLUSH
| NBD_FLAG_SEND_FUA
))
806 g_warning("Server flags do not support FLUSH and FUA - these may error");
809 blkhashname
=strdup("/tmp/blkarray-XXXXXX");
810 if (!blkhashname
|| (-1 == (blkhashfd
= mkstemp(blkhashname
)))) {
811 g_warning("Could not open temp file: %s", strerror(errno
));
816 /* use tmpnam here to avoid further feature test nightmare */
817 if (-1 == (blkhashfd
= open(blkhashname
=strdup(tmpnam(NULL
)),
819 S_IRUSR
|S_IWUSR
|S_IRGRP
|S_IROTH
))) {
820 g_warning("Could not open temp file: %s", strerror(errno
));
825 /* Ensure space freed if we die */
826 if (-1 == unlink(blkhashname
)) {
827 g_warning("Could not unlink temp file: %s", strerror(errno
));
832 if (-1 == lseek(blkhashfd
, (off_t
)((size
>>9)*sizeof(struct blkitem
)), SEEK_SET
)) {
833 g_warning("Could not llseek temp file: %s", strerror(errno
));
838 if (-1 == write(blkhashfd
, "\0", 1)) {
839 g_warning("Could not write temp file: %s", strerror(errno
));
844 if (NULL
== (blkhash
= mmap(NULL
,
845 (size
>>9)*sizeof(struct blkitem
),
846 PROT_READ
| PROT_WRITE
,
850 g_warning("Could not mmap temp file: %s", strerror(errno
));
855 if (-1 == (logfd
= open(transactionlog
, O_RDONLY
)))
857 g_warning("Could open log file: %s", strerror(errno
));
862 if(gettimeofday(&start
, NULL
)<0) {
864 snprintf(errstr
, errstr_len
, "Could not measure start time: %s", strerror(errno
));
868 while (readtransactionfile
|| txqueue
.numitems
|| txbuf
.numitems
|| inflight
.numitems
) {
875 struct reqcontext
* prc
;
881 if (readtransactionfile
)
882 FD_SET(logfd
, &rset
);
883 if ((!blocked
&& txqueue
.numitems
) || txbuf
.numitems
)
885 if (inflight
.numitems
)
889 ret
= select(1+((sock
>logfd
)?sock
:logfd
), &rset
, &wset
, NULL
, &tv
);
892 snprintf(errstr
, errstr_len
, "Timeout reading from socket");
895 g_warning("Could not mmap temp file: %s", errstr
);
899 /* We know we've got at least one thing to do here then */
901 /* Get a command from the transaction log */
902 if (FD_ISSET(logfd
, &rset
)) {
904 /* Read a request or reply from the transaction file */
905 READ_ALL_ERRCHK(logfd
,
909 "Could not read transaction log: %s",
911 magic
= ntohl(magic
);
913 case NBD_REQUEST_MAGIC
:
914 if (NULL
== (prc
= calloc(1, sizeof(struct reqcontext
)))) {
916 snprintf(errstr
, errstr_len
, "Could not allocate request");
919 READ_ALL_ERRCHK(logfd
,
920 sizeof(magic
)+(char *)&(prc
->req
),
921 sizeof(struct nbd_request
)-sizeof(magic
),
923 "Could not read transaction log: %s",
925 prc
->req
.magic
= htonl(NBD_REQUEST_MAGIC
);
926 memcpy(prc
->orighandle
, prc
->req
.handle
, 8);
928 if ((ntohl(prc
->req
.type
) & NBD_CMD_MASK_COMMAND
) == NBD_CMD_DISC
) {
929 /* no more to read; don't enqueue as no reply
930 * we will disconnect manually at the end
932 readtransactionfile
= 0;
935 dumpcommand("Enqueuing command", prc
->req
.type
);
936 rclist_addtail(&txqueue
, prc
);
940 case NBD_REPLY_MAGIC
:
941 READ_ALL_ERRCHK(logfd
,
942 sizeof(magic
)+(char *)(&rep
),
943 sizeof(struct nbd_reply
)-sizeof(magic
),
945 "Could not read transaction log: %s",
950 snprintf(errstr
, errstr_len
, "Transaction log file contained errored transaction");
954 /* We do not need to consume data on a read reply as there is
959 snprintf(errstr
, errstr_len
, "Could not measure start time: %08x", magic
);
964 /* See if we have a write we can do */
965 if (FD_ISSET(sock
, &wset
))
967 if ((!(txqueue
.head
) && !(txbuf
.head
)) || blocked
)
968 g_warning("Socket write FD set but we shouldn't have been interested");
970 /* If there is no buffered data, generate some */
971 if (!blocked
&& !(txbuf
.head
) && (NULL
!= (prc
= txqueue
.head
)))
973 if (ntohl(prc
->req
.magic
) != NBD_REQUEST_MAGIC
) {
975 g_warning("Asked to write a request without a magic number");
979 command
= ntohl(prc
->req
.type
);
980 from
= ntohll(prc
->req
.from
);
981 len
= ntohl(prc
->req
.len
);
983 /* First check whether we can touch this command at all. If this
984 * command is a read, and there is an inflight write, OR if this
985 * command is a write, and there is an inflight read or write, then
986 * we need to leave the command alone and signal that we are blocked
996 uint64_t blknum
= cfrom
>>9;
998 snprintf(errstr
, errstr_len
, "offset %llx beyond size %llx",
999 (long long int) cfrom
, (long long int)size
);
1002 if (blkhash
[blknum
].inflightw
||
1003 (blkhash
[blknum
].inflightr
&&
1004 ((command
& NBD_CMD_MASK_COMMAND
)==NBD_CMD_WRITE
))) {
1016 rclist_unlink(&txqueue
, prc
);
1017 rclist_addtail(&inflight
, prc
);
1019 dumpcommand("Sending command", prc
->req
.type
);
1020 /* we rewrite the handle as they otherwise may not be unique */
1021 *((uint64_t*)(prc
->req
.handle
))=getrandomhandle(handlehash
);
1022 g_hash_table_insert(handlehash
, prc
->req
.handle
, prc
);
1023 addbuffer(&txbuf
, &(prc
->req
), sizeof(struct nbd_request
));
1024 switch (command
& NBD_CMD_MASK_COMMAND
) {
1028 uint64_t blknum
= from
>>9;
1031 snprintf(errstr
, errstr_len
, "offset %llx beyond size %llx",
1032 (long long int) from
, (long long int)size
);
1035 (blkhash
[blknum
].inflightw
)++;
1036 /* work out what we should be writing */
1037 makebuf(dbuf
, prc
->seq
, blknum
);
1038 addbuffer(&txbuf
, dbuf
, 512);
1046 uint64_t blknum
= from
>>9;
1048 snprintf(errstr
, errstr_len
, "offset %llx beyond size %llx",
1049 (long long int) from
, (long long int)size
);
1052 (blkhash
[blknum
].inflightr
)++;
1062 snprintf(errstr
, errstr_len
, "Incomprehensible command: %08x", command
);
1071 /* there should be some now */
1072 if (writebuffer(sock
, &txbuf
)<0) {
1074 snprintf(errstr
, errstr_len
, "Failed to write to socket buffer: %s", strerror(errno
));
1080 /* See if there is a reply to be processed from the socket */
1081 if(FD_ISSET(sock
, &rset
)) {
1082 /* Okay, there's something ready for
1085 READ_ALL_ERRCHK(sock
,
1087 sizeof(struct nbd_reply
),
1089 "Could not read from server socket: %s",
1092 if (rep
.magic
!= htonl(NBD_REPLY_MAGIC
)) {
1094 snprintf(errstr
, errstr_len
, "Bad magic from server");
1100 snprintf(errstr
, errstr_len
, "Server errored a transaction");
1105 memcpy(&handle
,rep
.handle
,8);
1106 prc
= g_hash_table_lookup(handlehash
, &handle
);
1109 snprintf(errstr
, errstr_len
, "Unrecognised handle in reply: 0x%llX", *(long long unsigned int*)(rep
.handle
));
1112 if (!g_hash_table_remove(handlehash
, &handle
)) {
1114 snprintf(errstr
, errstr_len
, "Could not remove handle from hash: 0x%llX", *(long long unsigned int*)(rep
.handle
));
1118 if (prc
->req
.magic
!= htonl(NBD_REQUEST_MAGIC
)) {
1120 snprintf(errstr
, errstr_len
, "Bad magic in inflight data: %08x", prc
->req
.magic
);
1124 dumpcommand("Processing reply to command", prc
->req
.type
);
1125 command
= ntohl(prc
->req
.type
);
1126 from
= ntohll(prc
->req
.from
);
1127 len
= ntohl(prc
->req
.len
);
1129 switch (command
& NBD_CMD_MASK_COMMAND
) {
1132 uint64_t blknum
= from
>>9;
1135 snprintf(errstr
, errstr_len
, "offset %llx beyond size %llx",
1136 (long long int) from
, (long long int)size
);
1139 READ_ALL_ERRCHK(sock
,
1143 "Could not read data: %s",
1145 if (--(blkhash
[blknum
].inflightr
) <0 ) {
1146 snprintf(errstr
, errstr_len
, "Received a read reply for offset %llx when not in flight",
1147 (long long int) from
);
1150 /* work out what we was written */
1151 if (checkbuf(dbuf
, blkhash
[blknum
].seq
, blknum
)) {
1153 snprintf(errstr
, errstr_len
, "Bad reply data: I wanted blk %08x, seq %08x but I got (at a guess) blk %08x, seq %08x",
1154 (unsigned int) blknum
,
1155 blkhash
[blknum
].seq
,
1156 ((uint32_t *)(dbuf
))[0],
1157 ((uint32_t *)(dbuf
))[1]
1167 /* subsequent reads should get data with this seq*/
1169 uint64_t blknum
= from
>>9;
1170 if (--(blkhash
[blknum
].inflightw
) <0 ) {
1171 snprintf(errstr
, errstr_len
, "Received a write reply for offset %llx when not in flight",
1172 (long long int) from
);
1175 blkhash
[blknum
].seq
=(uint32_t)(prc
->seq
);
1185 rclist_unlink(&inflight
, prc
);
1186 prc
->req
.magic
=0; /* so a duplicate reply is detected */
1190 if (!(printer
++ % 1000) || !(readtransactionfile
|| txqueue
.numitems
|| inflight
.numitems
) )
1191 printf("%d: Seq %08lld Queued: %08d Inflight: %08d Done: %08lld\r",
1193 (long long int) seq
,
1196 (long long int) processed
);
1202 if (gettimeofday(&stop
, NULL
)<0) {
1204 snprintf(errstr
, errstr_len
, "Could not measure end time: %s", strerror(errno
));
1207 timespan
=timeval_diff_to_double(&stop
, &start
);
1208 speed
=xfer
/timespan
;
1221 g_message("%d: Integrity %s test complete. Took %.3f seconds to complete, %.3f%sib/s", (int)getpid(), (testflags
& TEST_WRITE
)?"write":"read", timespan
, speed
, speedchar
);
1225 close_connection(sock
, CONNECTION_CLOSE_PROPERLY
);
1228 if (size
&& blkhash
)
1229 munmap(blkhash
, (size
>>9)*sizeof(struct blkitem
));
1231 if (blkhashfd
!= -1)
1241 g_warning("%s",errstr
);
1243 g_hash_table_destroy(handlehash
);
1248 void handle_nonopt(char* opt
, gchar
** hostname
, long int* p
) {
1249 static int nonopt
=0;
1253 *hostname
=g_strdup(opt
);
1257 *p
=(strtol(opt
, NULL
, 0));
1258 if(*p
==LONG_MIN
||*p
==LONG_MAX
) {
1259 g_critical("Could not parse port number: %s", strerror(errno
));
1266 typedef int (*testfunc
)(gchar
*, int, char*, int, char, char, int);
1268 int main(int argc
, char**argv
) {
1276 testfunc test
= throughput_test
;
1278 /* Ignore SIGPIPE as we want to pick up the error from write() */
1279 signal (SIGPIPE
, SIG_IGN
);
1282 g_message("%d: Not enough arguments", (int)getpid());
1283 g_message("%d: Usage: %s <hostname> <port>", (int)getpid(), argv
[0]);
1284 g_message("%d: Or: %s <hostname> -N <exportname> [<port>]", (int)getpid(), argv
[0]);
1288 while((c
=getopt(argc
, argv
, "-N:t:owfil"))>=0) {
1291 handle_nonopt(optarg
, &hostname
, &p
);
1294 name
=g_strdup(optarg
);
1300 transactionlog
=g_strdup(optarg
);
1309 testflags
|=TEST_WRITE
;
1312 testflags
|=TEST_FLUSH
;
1315 test
=integrity_test
;
1320 while(optind
< argc
) {
1321 handle_nonopt(argv
[optind
++], &hostname
, &p
);
1324 if(test(hostname
, (int)p
, name
, sock
, FALSE
, TRUE
, testflags
)<0) {
1325 g_warning("Could not run test: %s", errstr
);