4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25 var disk_size
= uint(512 * 1024 * 1024)
26 var bs
= uint64(65536)
27 var max_reads_in_flight
= uint(16)
28 var bytes_read
= uint(0)
29 var bytes_written
= uint(0)
31 /* Functions to handle FdSet.
32 XXX These probably only work on 64 bit platforms. */
33 func fdset_set(set
*syscall
.FdSet
, fd
int) {
34 (*set
).Bits
[fd
/64] |
= 1 << (uintptr(fd
) % 64)
37 func fdset_test(set
*syscall
.FdSet
, fd
int) bool {
38 return (*set
).Bits
[fd
/64]&(1<<(uintptr(fd
)%64
)) != 0
41 /* Functions to test socket direction. */
42 func dir_is_read(h
*Libnbd
) bool {
43 dir
, _
:= h
.AioGetDirection()
44 return (uint32(dir
) & AIO_DIRECTION_READ
) != 0
46 func dir_is_write(h
*Libnbd
) bool {
47 dir
, _
:= h
.AioGetDirection()
48 return (uint32(dir
) & AIO_DIRECTION_WRITE
) != 0
51 /* Queue of writes. */
59 /* Called whenever any asynchronous pread command from
60 the source has completed. */
61 func read_completed(buf AioBuffer
, offset
uint64) int {
62 bytes_read
+= buf
.Size
63 /* Move the AIO buffer to the write queue. */
64 writes
= append(writes
, wbuf
{buf
, offset
})
65 /* Returning 1 means the command is automatically retired. */
69 /* Called whenever any asynchronous pwrite command to the
70 destination has completed. */
71 func write_completed(buf AioBuffer
) int {
72 bytes_written
+= buf
.Size
73 /* Now we have to manually free the AIO buffer. */
75 /* Returning 1 means the command is automatically retired. */
79 /* Copy between two libnbd handles using aynchronous I/O (AIO). */
80 func asynch_copy(t
*testing
.T
, src
*Libnbd
, dst
*Libnbd
) {
81 size
, _
:= dst
.GetSize()
83 /* This is our reading position in the source. */
87 /* Number of commands in flight on source and dest handles. */
88 src_in_flight
, _
:= src
.AioInFlight()
89 dst_in_flight
, _
:= dst
.AioInFlight()
91 /* We're finished when we've read everything from the
92 source and there are no commands in flight. */
93 if soff
>= size
&& src_in_flight
== 0 &&
98 /* If we're able to submit more reads from the
99 source then do it now. */
100 if soff
< size
&& src_in_flight
< max_reads_in_flight
{
105 buf
:= MakeAioBuffer(uint(n
))
107 var optargs AioPreadOptargs
108 optargs
.CompletionCallbackSet
= true
109 optargs
.CompletionCallback
= func(error
*int) int {
111 err
:= syscall
.Errno(*error
).Error()
114 return read_completed(buf
, soff_copy
)
116 src
.AioPread(buf
, soff
, &optargs
)
120 /* If there are any write commands waiting to
121 be issued, send them now. */
122 for _
, wb
:= range writes
{
123 var optargs AioPwriteOptargs
124 optargs
.CompletionCallbackSet
= true
125 optargs
.CompletionCallback
= func(error
*int) int {
127 err
:= syscall
.Errno(*error
).Error()
130 return write_completed(wb
.buf
)
132 dst
.AioPwrite(wb
.buf
, wb
.offset
, &optargs
)
136 /* Now poll the file descriptors. */
138 sfd
, err
:= src
.AioGetFd()
140 t
.Fatalf("src.AioGetFd: %s", err
)
145 dfd
, err
:= dst
.AioGetFd()
147 t
.Fatalf("dst.AioGetFd: %s", err
)
152 var rfds syscall
.FdSet
153 if dir_is_read(src
) {
154 fdset_set(&rfds
, sfd
)
156 if dir_is_read(dst
) {
157 fdset_set(&rfds
, dfd
)
159 var wfds syscall
.FdSet
160 if dir_is_write(src
) {
161 fdset_set(&wfds
, sfd
)
163 if dir_is_write(dst
) {
164 fdset_set(&wfds
, dfd
)
167 _
, err
= syscall
.Select(nfd
, &rfds
, &wfds
, nil, nil)
168 if err
!= syscall
.EINTR
{
173 t
.Fatalf("select: %s", err
)
176 if fdset_test(&rfds
, sfd
) && dir_is_read(src
) {
178 } else if fdset_test(&wfds
, sfd
) && dir_is_write(src
) {
180 } else if fdset_test(&rfds
, dfd
) && dir_is_read(dst
) {
182 } else if fdset_test(&wfds
, dfd
) && dir_is_write(dst
) {
188 func Test590AioCopy(t
*testing
.T
) {
191 t
.Fatalf("could not create handle: %s", err
)
194 src
.SetHandleName("src")
199 t
.Fatalf("could not create handle: %s", err
)
202 dst
.SetHandleName("dst")
204 err
= src
.ConnectCommand([]string{
205 "nbdkit", "-s", "--exit-with-parent", "-r",
206 "pattern", fmt
.Sprintf("size=%d", disk_size
),
209 t
.Fatalf("could not connect: %s", err
)
212 err
= dst
.ConnectCommand([]string{
213 "nbdkit", "-s", "--exit-with-parent",
214 "memory", fmt
.Sprintf("size=%d", disk_size
),
217 t
.Fatalf("could not connect: %s", err
)
220 asynch_copy(t
, src
, dst
)
221 if bytes_read
!= disk_size
{
222 t
.Fatalf("bytes_read != disk_size")
224 if bytes_written
!= disk_size
{
225 t
.Fatalf("bytes_written != disk_size")