2 * Copyright (c) 2010, Linux Box Corporation.
5 * Portions Copyright (c) 2007, Hartmut Reuter,
6 * RZG, Max-Planck-Institut f. Plasmaphysik.
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
19 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
20 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 * AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
26 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include <afsconfig.h>
32 #include <afs/param.h>
38 #include "rpc_test_procs.h"
42 #include <WINNT/afsevent.h>
45 #include <afs/venus.h>
48 #include <afs/afsint.h>
49 #define FSINT_COMMON_XG 1
53 #include <afs/cellconfig.h>
55 #include <afs/com_err.h>
59 #include <afs/errors.h>
60 #include <afs/sys_prototypes.h>
61 #include <rx/rx_prototypes.h>
62 #ifdef AFS_PTHREAD_ENV
66 extern const char *prog
;
67 const int ctx_key
= 1;
70 #define RPC_TEST_GLOBAL_RX_INIT 1
72 #undef RPC_TEST_GLOBAL_RX_INIT
75 const afs_uint32 fs_port
= 7000;
77 typedef struct rpc_test_pkg_params
{
79 pthread_mutexattr_t mtx_attrs
;
80 afs_uint32 cb_next_port
;
82 } rpc_test_pkg_params
;
83 static rpc_test_pkg_params rpc_test_params
;
85 afs_int32
rpc_test_PkgInit(void)
88 static afs_uint32 rpc_test_initialized
= 0; /* once */
90 if (!rpc_test_initialized
) {
91 rpc_test_initialized
= 1;
93 printf("%s: rpc_test_PkgInit: package already initialized\n", prog
);
98 code
= pthread_mutexattr_init(&rpc_test_params
.mtx_attrs
);
100 printf("%s: rpc_test_PkgInit: pthread_mutexattr_init failed\n", prog
);
103 code
= pthread_mutex_init(&rpc_test_params
.mtx
, &rpc_test_params
.mtx_attrs
);
105 printf("%s: rpc_test_PkgInit: pthread_mutex_init failed\n", prog
);
110 /* start connection sequence */
111 rpc_test_params
.next_cno
= 1;
113 /* set the starting port in sequence */
114 rpc_test_params
.cb_next_port
= 7105;
116 #if defined(RPC_TEST_GLOBAL_RX_INIT)
122 } /* rpc_test_PkgInit */
125 init_callback_service_lwp(void *arg
)
127 struct rx_securityClass
*sc
;
128 struct rx_service
*svc
;
130 rpc_test_request_ctx
*ctx
= (rpc_test_request_ctx
*) arg
;
132 printf("%s: init_callback_service_lwp: listen_addr: %s "
133 "(%d) cb_port: %d\n",
134 prog
, ctx
->cb_listen_addr_s
, ctx
->cb_listen_addr
.addr_in
[0],
137 sc
= (struct rx_securityClass
*) rxnull_NewServerSecurityObject();
139 fprintf(stderr
,"rxnull_NewServerSecurityObject failed for callback "
144 #if defined(RPC_TEST_GLOBAL_RX_INIT)
145 svc
= rx_NewServiceHost(htonl(INADDR_ANY
), htons(ctx
->cb_port
), 1,
146 ctx
->cb_svc_name
, &sc
, 1, RXAFSCB_ExecuteRequest
);
148 svc
= rx_NewService(0, 1, ctx
->cb_svc_name
, &sc
, 1, RXAFSCB_ExecuteRequest
);
151 rx_SetServiceSpecific(svc
, ctx_key
, ctx
);
154 fprintf(stderr
,"rx_NewServiceHost failed for callback service\n");
158 /* XXX stash service so we can hijack its rx_socket when inititiating
162 /* release pkg mutex before entering rx processing loop */
163 pthread_mutex_unlock(&rpc_test_params
.mtx
);
167 printf("%s: init_callback_service_lwp: finished", prog
);
171 } /* callback_service_lwp */
173 afs_int32
init_callback_service(rpc_test_request_ctx
*ctx
)
176 pthread_attr_t tattr
;
179 afs_uuid_create(&(ctx
->cb_listen_addr
.uuid
));
181 #if !defined(RPC_TEST_GLOBAL_RX_INIT)
183 code
= rx_InitHost(ctx
->cb_listen_addr
.addr_in
[0],
184 (int) htons(ctx
->cb_port
));
186 code
= rx_Init((int) htons(ctx
->cb_port
));
188 #endif /* RPC_TEST_GLOBAL_RX_INIT */
190 assert(pthread_attr_init(&tattr
) == 0);
191 assert(pthread_attr_setdetachstate(&tattr
, PTHREAD_CREATE_DETACHED
) == 0);
192 assert(pthread_create(&tid
, &tattr
, init_callback_service_lwp
, ctx
) == 0);
196 } /* init_callback_service */
198 afs_int32
init_fs_channel(rpc_test_request_ctx
**octx
, char *cb_if
,
199 char *listen_addr_s
, char *prefix
, char *fs_addr_s
,
202 rpc_test_request_ctx
*ctx
;
205 afs_int32 sslen
= sizeof(struct sockaddr
);
208 ctx
= *octx
= (rpc_test_request_ctx
*) malloc(sizeof(rpc_test_request_ctx
));
209 memset(ctx
, 0, sizeof(rpc_test_request_ctx
));
211 /* initialize a local mutex */
212 code
= pthread_mutex_init(&ctx
->mtx
, &rpc_test_params
.mtx_attrs
);
214 /* lock package before rx setup--which has global deps, atm */
215 pthread_mutex_lock(&rpc_test_params
.mtx
);
217 ctx
->cno
= rpc_test_params
.next_cno
++;
220 /* afscbint (server) */
221 sprintf(ctx
->cb_svc_name
, "cb_%d", ctx
->cno
);
222 sprintf(ctx
->cb_if_s
, "%s", cb_if
);
223 sprintf(ctx
->cb_listen_addr_s
, "%s", listen_addr_s
);
224 sprintf(ctx
->cb_prefix_s
, "%s", prefix
);
225 sprintf(ctx
->fs_addr_s
, "%s", fs_addr_s
);
227 #if defined(RPC_TEST_ADD_ADDRESSES)
228 #if defined(AFS_LINUX26_ENV)
229 sprintf(cmd
, "ip addr add %s/%s dev %s label %s", listen_addr_s
, prefix
,
233 #endif /* RPC_TEST_ADD_ADDRESSES */
236 pthread_mutex_lock(&ctx
->mtx
);
239 ctx
->cb_port
= rpc_test_params
.cb_next_port
++;
240 ctx
->cb_listen_addr
.numberOfInterfaces
= 1;
243 code
= WSAStringToAddressA(listen_addr_s
, AF_INET
, NULL
,
244 (struct sockaddr
*) &(ctx
->cb_listen_addr
), &sslen
);
246 code
= inet_pton(AF_INET
, listen_addr_s
,
247 (void*) &(ctx
->cb_listen_addr
.addr_in
[0]));
250 code
= init_callback_service(ctx
/* LOCKED, && rpc_test_params->mtx LOCKED */);
255 code
= WSAStringToAddressA(fs_addr_s
, AF_INET
, NULL
,
256 (struct sockaddr
*) &(ctx
->fs_addr
.addr_in
[0]), &sslen
);
258 code
= inet_pton(AF_INET
, fs_addr_s
, (void*) &(ctx
->fs_addr
.addr_in
[0]));
260 ctx
->sc
= rxnull_NewClientSecurityObject();
261 ctx
->sc_index
= RX_SECIDX_NULL
;
262 ctx
->conn
= rx_NewConnection(ctx
->fs_addr
.addr_in
[0], (int) htons(fs_port
),
263 1, ctx
->sc
, ctx
->sc_index
);
266 pthread_mutex_unlock(&ctx
->mtx
);
270 } /* init_fs_channel */
272 /* XXX use the pkg lock to protect the state of rx_socket for
273 * the duration of the call, switching it out for the stashed
274 * rx_socket created by rx_NewService for this channel */
275 #define RXCALL_WITH_SOCK(code, ctx, call) \
277 osi_socket prev_rx_socket; \
278 pthread_mutex_lock(&rpc_test_params.mtx); \
279 prev_rx_socket = rx_socket; \
280 rx_socket = ctx->svc->socket; \
282 rx_socket = prev_rx_socket; \
283 pthread_mutex_unlock(&rpc_test_params.mtx); \
287 rpc_test_afs_fetch_status(rpc_test_request_ctx
*ctx
, AFSFid
*fid
,
288 AFSFetchStatus
*outstatus
)
290 struct AFSVolSync tsync
;
291 struct AFSCallBack tcb
;
294 RXCALL_WITH_SOCK(code
, ctx
,
295 (RXAFS_FetchStatus(ctx
->conn
, fid
, outstatus
, &tcb
, &tsync
)));
299 } /* rpc_test_afs_fetch_status */
302 rpc_test_afs_store_status(rpc_test_request_ctx
*ctx
, AFSFid
*fid
,
303 AFSStoreStatus
*instatus
, AFSFetchStatus
*outstatus
)
305 struct AFSVolSync tsync
;
308 RXCALL_WITH_SOCK(code
, ctx
,
309 (RXAFS_StoreStatus(ctx
->conn
, fid
, instatus
, outstatus
, &tsync
)));
313 } /* rpc_test_afs_fetch_status */
315 #if defined(AFS_BYTE_RANGE_FLOCKS)
316 afs_int32
rpc_test_afs_set_byterangelock(rpc_test_request_ctx
*ctx
,
317 AFSByteRangeLock
* lock
)
319 struct rx_call
*tcall
;
322 RXCALL_WITH_SOCK(code
, ctx
,
323 (RXAFS_SetByteRangeLock(ctx
->conn
, lock
)));
327 } /* rpc_test_afs_set_byterangelock */
329 afs_int32
rpc_test_afs_release_byterangelock(rpc_test_request_ctx
*ctx
,
330 AFSByteRangeLock
* lock
)
332 struct rx_call
*tcall
;
335 RXCALL_WITH_SOCK(code
, ctx
,
336 (RXAFS_ReleaseByteRangeLock(ctx
->conn
, lock
)));
340 } /* rpc_test_afs_release_byterangelock */
342 afs_int32
rpc_test_afs_upgrade_byterangelock(rpc_test_request_ctx
*ctx
,
343 AFSByteRangeLock
* lock
)
347 /* TODO: implement */
351 } /* rpc_test_afs_upgrade_byterangelock */
353 afs_int32
rpc_test_afs_downgrade_byterangelock(rpc_test_request_ctx
*ctx
,
354 AFSByteRangeLock
* Lock
)
358 /* TODO: implement */
362 } /* rpc_test_afs_downgrade_byterangelock */
363 #endif /* AFS_BYTE_RANGE_FLOCKS */
366 destroy_fs_channel(rpc_test_request_ctx
*ctx
)
369 #if defined(RPC_TEST_ADD_ADDRESSES)
370 #if defined(AFS_LINUX26_ENV)
371 sprintf(cmd
, "ip addr del %s/%s dev %s label %s", ctx
->cb_listen_addr_s
,
372 ctx
->cb_prefix_s
, ctx
->cb_if_s
, ctx
->cb_if_s
);
375 #endif /* RPC_TEST_ADD_ADDRESSES */
380 } /* destroy_fs_channel */
383 rpc_test_PkgShutdown(void)
385 } /* rpc_test_PkgShutdown */