2 * Copyright (c) 2010, Linux Box Corporation.
5 * Portions Copyright (c) 2007, Hartmut Reuter,
6 * RZG, Max-Planck-Institut f. Plasmaphysik.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
19 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
20 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
26 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include <afsconfig.h>
32 #include <afs/param.h>
35 #include "rpc_test_procs.h"
38 #include <sys/types.h>
43 #include <sys/param.h>
45 #include <sys/ioctl.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
57 #include <WINNT/afsevent.h>
60 #include <afs/venus.h>
64 #include <afs/afsint.h>
65 #define FSINT_COMMON_XG 1
72 #include <afs/cellconfig.h>
74 #include <afs/com_err.h>
82 #include <sys/malloc.h>
86 #include <afs/errors.h>
87 #include <afs/sys_prototypes.h>
88 #include <rx/rx_prototypes.h>
89 #ifdef AFS_PTHREAD_ENV
93 extern const char *prog
;
94 const int ctx_key
= 1;
97 #define RPC_TEST_GLOBAL_RX_INIT 1
99 #undef RPC_TEST_GLOBAL_RX_INIT
102 const afs_uint32 fs_port
= 7000;
104 typedef struct rpc_test_pkg_params
{
106 pthread_mutexattr_t mtx_attrs
;
107 afs_uint32 cb_next_port
;
109 } rpc_test_pkg_params
;
110 static rpc_test_pkg_params rpc_test_params
;
112 afs_int32
rpc_test_PkgInit()
115 static afs_uint32 rpc_test_initialized
= 0; /* once */
117 if (!rpc_test_initialized
) {
118 rpc_test_initialized
= 1;
120 printf("%s: rpc_test_PkgInit: package already initialized\n");
125 code
= pthread_mutexattr_init(&rpc_test_params
.mtx_attrs
);
127 printf("%s: rpc_test_PkgInit: pthread_mutexattr_init failed\n", prog
);
130 code
= pthread_mutex_init(&rpc_test_params
.mtx
, &rpc_test_params
.mtx_attrs
);
132 printf("%s: rpc_test_PkgInit: pthread_mutex_init failed\n", prog
);
137 /* start connection sequence */
138 rpc_test_params
.next_cno
= 1;
140 /* set the starting port in sequence */
141 rpc_test_params
.cb_next_port
= 7105;
143 #if defined(RPC_TEST_GLOBAL_RX_INIT)
149 } /* rpc_test_PkgInit */
152 init_callback_service_lwp(void *arg
)
154 struct rx_securityClass
*sc
;
155 struct rx_service
*svc
;
158 rpc_test_request_ctx
*ctx
= (rpc_test_request_ctx
*) arg
;
160 printf("%s: init_callback_service_lwp: listen_addr: %s "
161 "(%d) cb_port: %d\n",
162 prog
, ctx
->cb_listen_addr_s
, ctx
->cb_listen_addr
.addr_in
[0],
165 sc
= (struct rx_securityClass
*) rxnull_NewServerSecurityObject();
167 fprintf(stderr
,"rxnull_NewServerSecurityObject failed for callback "
172 #if defined(RPC_TEST_GLOBAL_RX_INIT)
173 svc
= rx_NewServiceHost(htonl(INADDR_ANY
), htons(ctx
->cb_port
), 1,
174 ctx
->cb_svc_name
, &sc
, 1, RXAFSCB_ExecuteRequest
);
176 svc
= rx_NewService(0, 1, ctx
->cb_svc_name
, &sc
, 1, RXAFSCB_ExecuteRequest
);
179 rx_SetServiceSpecific(svc
, ctx_key
, ctx
);
182 fprintf(stderr
,"rx_NewServiceHost failed for callback service\n");
186 /* XXX stash service so we can hijack its rx_socket when inititiating
190 /* release pkg mutex before entering rx processing loop */
191 pthread_mutex_unlock(&rpc_test_params
.mtx
);
195 printf("%s: init_callback_service_lwp: finished");
199 } /* callback_service_lwp */
201 afs_int32
init_callback_service(rpc_test_request_ctx
*ctx
)
204 pthread_attr_t tattr
;
207 afs_uuid_create(&(ctx
->cb_listen_addr
.uuid
));
209 #if !defined(RPC_TEST_GLOBAL_RX_INIT)
211 code
= rx_InitHost(ctx
->cb_listen_addr
.addr_in
[0],
212 (int) htons(ctx
->cb_port
));
214 code
= rx_Init((int) htons(ctx
->cb_port
));
216 #endif /* RPC_TEST_GLOBAL_RX_INIT */
218 assert(pthread_attr_init(&tattr
) == 0);
219 assert(pthread_attr_setdetachstate(&tattr
, PTHREAD_CREATE_DETACHED
) == 0);
220 assert(pthread_create(&tid
, &tattr
, init_callback_service_lwp
, ctx
) == 0);
224 } /* init_callback_service */
226 afs_int32
init_fs_channel(rpc_test_request_ctx
**octx
, char *cb_if
,
227 char *listen_addr_s
, char *prefix
, char *fs_addr_s
,
231 rpc_test_request_ctx
*ctx
;
234 afs_int32 sslen
= sizeof(struct sockaddr
);
237 ctx
= *octx
= (rpc_test_request_ctx
*) malloc(sizeof(rpc_test_request_ctx
));
238 memset(ctx
, 0, sizeof(rpc_test_request_ctx
));
240 /* initialize a local mutex */
241 code
= pthread_mutex_init(&ctx
->mtx
, &rpc_test_params
.mtx_attrs
);
243 /* lock package before rx setup--which has global deps, atm */
244 pthread_mutex_lock(&rpc_test_params
.mtx
);
246 ctx
->cno
= rpc_test_params
.next_cno
++;
249 /* afscbint (server) */
250 sprintf(ctx
->cb_svc_name
, "cb_%d", ctx
->cno
);
251 sprintf(ctx
->cb_if_s
, cb_if
);
252 sprintf(ctx
->cb_listen_addr_s
, listen_addr_s
);
253 sprintf(ctx
->cb_prefix_s
, prefix
);
254 sprintf(ctx
->fs_addr_s
, fs_addr_s
);
256 #if defined(RPC_TEST_ADD_ADDRESSES)
257 #if defined(AFS_LINUX26_ENV)
258 sprintf(cmd
, "ip addr add %s/%s dev %s label %s", listen_addr_s
, prefix
,
262 #endif /* RPC_TEST_ADD_ADDRESSES */
265 pthread_mutex_lock(&ctx
->mtx
);
268 ctx
->cb_port
= rpc_test_params
.cb_next_port
++;
269 ctx
->cb_listen_addr
.numberOfInterfaces
= 1;
272 code
= WSAStringToAddressA(listen_addr_s
, AF_INET
, NULL
,
273 (struct sockaddr
*) &(ctx
->cb_listen_addr
), &sslen
);
275 code
= inet_pton(AF_INET
, listen_addr_s
,
276 (void*) &(ctx
->cb_listen_addr
.addr_in
[0]));
279 code
= init_callback_service(ctx
/* LOCKED, && rpc_test_params->mtx LOCKED */);
284 code
= WSAStringToAddressA(fs_addr_s
, AF_INET
, NULL
,
285 (struct sockaddr
*) &(ctx
->fs_addr
.addr_in
[0]), &sslen
);
287 code
= inet_pton(AF_INET
, fs_addr_s
, (void*) &(ctx
->fs_addr
.addr_in
[0]));
289 ctx
->sc
= rxnull_NewClientSecurityObject();
290 ctx
->sc_index
= RX_SECIDX_NULL
;
291 ctx
->conn
= rx_NewConnection(ctx
->fs_addr
.addr_in
[0], (int) htons(fs_port
),
292 1, ctx
->sc
, ctx
->sc_index
);
295 pthread_mutex_unlock(&ctx
->mtx
);
300 } /* init_fs_channel */
302 /* XXX use the pkg lock to protect the state of rx_socket for
303 * the duration of the call, switching it out for the stashed
304 * rx_socket created by rx_NewService for this channel */
305 #define RXCALL_WITH_SOCK(code, ctx, call) \
307 osi_socket prev_rx_socket; \
308 pthread_mutex_lock(&rpc_test_params.mtx); \
309 prev_rx_socket = rx_socket; \
310 rx_socket = ctx->svc->socket; \
312 rx_socket = prev_rx_socket; \
313 pthread_mutex_unlock(&rpc_test_params.mtx); \
317 rpc_test_afs_fetch_status(rpc_test_request_ctx
*ctx
, AFSFid
*fid
,
318 AFSFetchStatus
*outstatus
)
320 struct rx_call
*tcall
;
321 struct AFSVolSync tsync
;
322 struct AFSCallBack tcb
;
325 RXCALL_WITH_SOCK(code
, ctx
,
326 (RXAFS_FetchStatus(ctx
->conn
, fid
, outstatus
, &tcb
, &tsync
)));
330 } /* rpc_test_afs_fetch_status */
333 rpc_test_afs_store_status(rpc_test_request_ctx
*ctx
, AFSFid
*fid
,
334 AFSStoreStatus
*instatus
, AFSFetchStatus
*outstatus
)
336 struct rx_call
*tcall
;
337 struct AFSVolSync tsync
;
340 RXCALL_WITH_SOCK(code
, ctx
,
341 (RXAFS_StoreStatus(ctx
->conn
, fid
, instatus
, outstatus
, &tsync
)));
345 } /* rpc_test_afs_fetch_status */
347 #if defined(AFS_BYTE_RANGE_FLOCKS)
348 afs_int32
rpc_test_afs_set_byterangelock(rpc_test_request_ctx
*ctx
,
349 AFSByteRangeLock
* lock
)
351 struct rx_call
*tcall
;
354 RXCALL_WITH_SOCK(code
, ctx
,
355 (RXAFS_SetByteRangeLock(ctx
->conn
, lock
)));
359 } /* rpc_test_afs_set_byterangelock */
361 afs_int32
rpc_test_afs_release_byterangelock(rpc_test_request_ctx
*ctx
,
362 AFSByteRangeLock
* lock
)
364 struct rx_call
*tcall
;
367 RXCALL_WITH_SOCK(code
, ctx
,
368 (RXAFS_ReleaseByteRangeLock(ctx
->conn
, lock
)));
372 } /* rpc_test_afs_release_byterangelock */
374 afs_int32
rpc_test_afs_upgrade_byterangelock(rpc_test_request_ctx
*ctx
,
375 AFSByteRangeLock
* lock
)
379 /* TODO: implement */
383 } /* rpc_test_afs_upgrade_byterangelock */
385 afs_int32
rpc_test_afs_downgrade_byterangelock(rpc_test_request_ctx
*ctx
,
386 AFSByteRangeLock
* Lock
)
390 /* TODO: implement */
394 } /* rpc_test_afs_downgrade_byterangelock */
395 #endif /* AFS_BYTE_RANGE_FLOCKS */
398 destroy_fs_channel(rpc_test_request_ctx
*ctx
)
402 #if defined(RPC_TEST_ADD_ADDRESSES)
403 #if defined(AFS_LINUX26_ENV)
404 sprintf(cmd
, "ip addr del %s/%s dev %s label %s", ctx
->cb_listen_addr_s
,
405 ctx
->cb_prefix_s
, ctx
->cb_if_s
, ctx
->cb_if_s
);
408 #endif /* RPC_TEST_ADD_ADDRESSES */
413 } /* destroy_fs_channel */
416 rpc_test_PkgShutdown()
420 } /* rpc_test_PkgShutdown */