Expand PMF_FN_* macros.
[netbsd-mini2440.git] / sys / rump / librump / rumpkern / sysproxy_socket.c
blob208fc5624138c719a6e12c9d6d4996989b142572
1 /* $NetBSD: sysproxy_socket.c,v 1.4 2009/10/14 18:18:53 pooka Exp $ */
3 /*
4 * Copyright (c) 2009 Antti Kantee. All Rights Reserved.
6 * Development of this software was supported by The Nokia Foundation.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
30 #include <sys/cdefs.h>
31 __KERNEL_RCSID(0, "$NetBSD: sysproxy_socket.c,v 1.4 2009/10/14 18:18:53 pooka Exp $");
33 #include <sys/param.h>
34 #include <sys/kernel.h>
35 #include <sys/kmem.h>
36 #include <sys/kthread.h>
37 #include <sys/queue.h>
38 #include <sys/syscall.h>
39 #include <sys/atomic.h>
41 #include <rump/rump.h>
42 #include <rump/rumpuser.h>
44 #include "rump_private.h"
47 * This is a very simple RPC mechanism. It defines:
49 * 1) system call
50 * 2) copyin
51 * 3) copyout
53 * Currently all the data is in host format, so this can't really be
54 * used to make cross-site calls. Extending to something like XMLRPC
55 * is possible, but takes some amount of programming effort, since all
56 * the kernel data structures and types are defined in C.
58 * XXX: yes, this is overall implemented in a very lazy fashion
59 * currently. Hopefully it will get better. And hopefully the
60 * global "sock" will go away, it's quite disgusting.
63 enum rumprpc { RUMPRPC_SYSCALL, RUMPRPC_SYSCALL_RESP,
64 RUMPRPC_COPYIN, RUMPRPC_COPYIN_RESP,
65 RUMPRPC_COPYOUT, RUMPRPC_COPYOUT_RESP };
67 struct rumprpc_head {
68 uint64_t rpch_flen;
69 uint32_t rpch_reqno;
70 int rpch_type;
73 struct rumprpc_sysreq {
74 struct rumprpc_head rpc_head;
75 int rpc_sysnum;
76 uint8_t rpc_data[0];
79 struct rumprpc_sysresp {
80 struct rumprpc_head rpc_head;
81 int rpc_error;
82 register_t rpc_retval;
85 struct rumprpc_copydata {
86 struct rumprpc_head rpc_head;
87 void *rpc_addr;
88 size_t rpc_len;
89 uint8_t rpc_data[0];
92 struct sysproxy_qent {
93 uint32_t reqno;
94 struct rumprpc_head *rpch_resp;
96 kcondvar_t cv;
97 TAILQ_ENTRY(sysproxy_qent) entries;
100 static TAILQ_HEAD(, sysproxy_qent) sendq = TAILQ_HEAD_INITIALIZER(sendq);
101 static TAILQ_HEAD(, sysproxy_qent) recvq = TAILQ_HEAD_INITIALIZER(recvq);
102 static bool sendavail = true;
103 static kmutex_t sendmtx, recvmtx;
104 static unsigned reqno;
106 static struct sysproxy_qent *
107 get_qent(void)
109 struct sysproxy_qent *qent;
110 unsigned myreq = atomic_inc_uint_nv(&reqno);
112 qent = kmem_alloc(sizeof(*qent), KM_SLEEP);
114 qent->reqno = myreq;
115 qent->rpch_resp = NULL;
116 cv_init(&qent->cv, "sproxyq");
118 return qent;
121 static void
122 put_qent(struct sysproxy_qent *qent)
125 cv_destroy(&qent->cv);
126 kmem_free(qent, sizeof(*qent));
129 static int
130 write_n(int s, uint8_t *data, size_t dlen)
132 ssize_t n;
133 size_t done;
134 int error;
136 error = 0;
137 for (done = 0; done < dlen; done += n) {
138 n = rumpuser_write(s, data + done, dlen - done, &error);
139 if (n <= 0) {
140 if (n == -1)
141 return error;
142 return ECONNRESET;
146 return error;
149 static int
150 read_n(int s, uint8_t *data, size_t dlen)
152 ssize_t n;
153 size_t done;
154 int error;
156 error = 0;
157 for (done = 0; done < dlen; done += n) {
158 n = rumpuser_read(s, data + done, dlen - done, &error);
159 if (n <= 0) {
160 if (n == -1)
161 return error;
162 return ECONNRESET;
166 return error;
169 static void
170 dosend(int s, struct sysproxy_qent *qent, uint8_t *data, size_t len, bool rq)
172 int error;
175 * Send. If the sendq is empty, we send in current thread context.
176 * Otherwise we enqueue ourselves and block until we are able to
177 * send in current thread context, i.e. we are the first in the queue.
179 mutex_enter(&sendmtx);
180 if (!sendavail) {
181 TAILQ_INSERT_TAIL(&sendq, qent, entries);
182 while (qent != TAILQ_FIRST(&sendq))
183 cv_wait(&qent->cv, &sendmtx);
184 KASSERT(qent == TAILQ_FIRST(&sendq));
185 TAILQ_REMOVE(&sendq, qent, entries);
187 sendavail = false;
188 mutex_exit(&sendmtx);
191 * Put ourselves onto the receive queue already now in case
192 * the response arrives "immediately" after sending.
194 if (rq) {
195 mutex_enter(&recvmtx);
196 TAILQ_INSERT_TAIL(&recvq, qent, entries);
197 mutex_exit(&recvmtx);
200 /* XXX: need better error recovery */
201 if ((error = write_n(s, data, len)) != 0)
202 panic("unrecoverable error %d (sloppy)", error);
205 struct wrk {
206 void (*wfn)(void *arg);
207 void *arg;
208 kcondvar_t wrkcv;
209 LIST_ENTRY(wrk) entries;
212 static LIST_HEAD(, wrk) idlewrker = LIST_HEAD_INITIALIZER(idlewrker);
214 #define NIDLE_MAX 5
216 static kmutex_t wrkmtx;
217 static unsigned nwrk, nidle;
220 * workers for handling requests. comes with simple little pooling
221 * for threads.
223 static void
224 wrkthread(void *arg)
226 struct wrk *wrk = arg;
228 mutex_enter(&wrkmtx);
229 nidle++;
230 for (;;) {
231 while (wrk->wfn == NULL)
232 cv_wait(&wrk->wrkcv, &wrkmtx);
233 nidle--;
234 mutex_exit(&wrkmtx);
236 wrk->wfn(wrk->arg);
237 wrk->wfn = NULL;
238 wrk->arg = NULL;
240 mutex_enter(&wrkmtx);
241 if (++nidle > NIDLE_MAX) {
242 nidle--;
243 break;
245 LIST_INSERT_HEAD(&idlewrker, wrk, entries);
247 nwrk--;
248 mutex_exit(&wrkmtx);
250 cv_destroy(&wrk->wrkcv);
251 kmem_free(wrk, sizeof(*wrk));
252 kthread_exit(0);
256 * Enqueue work into a separate thread context. Will create a new
257 * thread if there are none available.
259 static void
260 wrkenqueue(void (*wfn)(void *), void *arg)
262 struct wrk *wrk;
263 int error;
265 mutex_enter(&wrkmtx);
266 if (nidle == 0) {
267 nwrk++;
268 if (nwrk > 30)
269 printf("syscall proxy warning: over 30 workers\n");
270 mutex_exit(&wrkmtx);
271 wrk = kmem_zalloc(sizeof(*wrk), KM_SLEEP);
272 cv_init(&wrk->wrkcv, "sproxywrk");
273 retry:
274 error = kthread_create(PRI_NONE, KTHREAD_MPSAFE, NULL,
275 wrkthread, wrk, NULL, "spw_%d", nwrk);
276 if (error) {
277 printf("kthread_create failed: %d. retry.\n", error);
278 kpause("eagain", false, hz, NULL);
279 goto retry;
281 } else {
282 wrk = LIST_FIRST(&idlewrker);
283 LIST_REMOVE(wrk, entries);
284 mutex_exit(&wrkmtx);
287 wrk->wfn = wfn;
288 wrk->arg = arg;
290 mutex_enter(&wrkmtx);
291 cv_signal(&wrk->wrkcv);
292 mutex_exit(&wrkmtx);
295 static int sock; /* XXXXXX */
298 rump_sysproxy_copyout(const void *kaddr, void *uaddr, size_t len)
300 struct rumprpc_copydata *req;
301 struct sysproxy_qent *qent = get_qent();
302 size_t totlen = sizeof(*req) + len;
304 req = kmem_alloc(totlen, KM_SLEEP);
306 req->rpc_head.rpch_flen = totlen;
307 req->rpc_head.rpch_reqno = qent->reqno;
308 req->rpc_head.rpch_type = RUMPRPC_COPYOUT;
309 req->rpc_addr = uaddr;
310 req->rpc_len = len;
311 memcpy(req->rpc_data, kaddr, len);
313 /* XXX: handle async? */
314 dosend(sock, qent, (uint8_t *)req, totlen, false);
315 kmem_free(req, totlen);
316 put_qent(qent);
318 return 0;
322 rump_sysproxy_copyin(const void *uaddr, void *kaddr, size_t len)
324 struct sysproxy_qent *qent = get_qent();
325 struct rumprpc_copydata req, *resp;
327 /* build request */
328 req.rpc_head.rpch_flen = sizeof(req);
329 req.rpc_head.rpch_reqno = qent->reqno;
330 req.rpc_head.rpch_type = RUMPRPC_COPYIN;
332 req.rpc_addr = __UNCONST(uaddr);
333 req.rpc_len = len;
335 dosend(sock, qent, (uint8_t *)&req, sizeof(req), true);
338 * Wake up next sender or just toggle availability
340 mutex_enter(&sendmtx);
341 if (TAILQ_EMPTY(&sendq)) {
342 sendavail = true;
343 } else {
344 cv_signal(&TAILQ_FIRST(&sendq)->cv);
346 mutex_exit(&sendmtx);
348 /* Wait for response */
349 mutex_enter(&recvmtx);
350 while (qent->rpch_resp == NULL)
351 cv_wait(&qent->cv, &recvmtx);
352 mutex_exit(&recvmtx);
354 resp = (struct rumprpc_copydata *)qent->rpch_resp;
355 /* we trust our kernel */
356 KASSERT(resp->rpc_head.rpch_type == RUMPRPC_COPYIN_RESP);
358 memcpy(kaddr, resp->rpc_data, len);
359 kmem_free(resp, resp->rpc_head.rpch_flen);
360 put_qent(qent);
362 return 0;
365 struct vmspace rump_sysproxy_vmspace;
367 static void
368 handle_syscall(void *arg)
370 struct rumprpc_sysreq *req = arg;
371 struct sysproxy_qent *qent = get_qent();
372 struct rumprpc_sysresp resp;
373 struct sysent *callp;
374 struct lwp *mylwp, *l;
376 resp.rpc_head.rpch_flen = sizeof(resp);
377 resp.rpc_head.rpch_reqno = req->rpc_head.rpch_reqno;
378 resp.rpc_head.rpch_type = RUMPRPC_SYSCALL_RESP;
380 if (__predict_false(req->rpc_sysnum >= SYS_NSYSENT)) {
381 resp.rpc_error = ENOSYS;
382 dosend(sock, qent, (uint8_t *)&resp, sizeof(resp), false);
383 kmem_free(req, req->rpc_head.rpch_flen);
384 put_qent(qent);
385 return;
388 callp = rump_sysent + req->rpc_sysnum;
389 mylwp = curlwp;
390 l = rump_newproc_switch();
391 rump_set_vmspace(&rump_sysproxy_vmspace);
393 resp.rpc_retval = 0; /* default */
394 resp.rpc_error = callp->sy_call(l, (void *)req->rpc_data,
395 &resp.rpc_retval);
396 rump_lwp_release(l);
397 rump_lwp_switch(mylwp);
398 kmem_free(req, req->rpc_head.rpch_flen);
400 dosend(sock, qent, (uint8_t *)&resp, sizeof(resp), false);
401 put_qent(qent);
404 static void
405 handle_copyin(void *arg)
407 struct rumprpc_copydata *req = arg;
408 struct sysproxy_qent *qent = get_qent();
409 struct rumprpc_copydata *resp;
410 size_t totlen = sizeof(*resp) + req->rpc_len;
412 resp = kmem_alloc(totlen, KM_SLEEP);
413 resp->rpc_head.rpch_flen = totlen;
414 resp->rpc_head.rpch_reqno = req->rpc_head.rpch_reqno;
415 resp->rpc_head.rpch_type = RUMPRPC_COPYIN_RESP;
416 memcpy(resp->rpc_data, req->rpc_addr, req->rpc_len);
417 kmem_free(req, req->rpc_head.rpch_flen);
419 dosend(sock, qent, (uint8_t *)resp, totlen, false);
420 kmem_free(resp, totlen);
421 put_qent(qent);
425 * Client side. We can either get the results of an earlier syscall or
426 * get a request for a copyin/out.
428 static void
429 recvthread(void *arg)
431 struct rumprpc_head rpch;
432 uint8_t *rpc;
433 int s = (uintptr_t)arg;
434 int error;
436 for (;;) {
437 error = read_n(s, (uint8_t *)&rpch, sizeof(rpch));
438 if (error)
439 panic("%d", error);
441 rpc = kmem_alloc(rpch.rpch_flen , KM_SLEEP);
442 error = read_n(s, rpc + sizeof(struct rumprpc_head),
443 rpch.rpch_flen - sizeof(struct rumprpc_head));
444 if (error)
445 panic("%d", error);
446 memcpy(rpc, &rpch, sizeof(rpch));
448 switch (rpch.rpch_type) {
449 case RUMPRPC_SYSCALL:
450 /* assert server */
451 wrkenqueue(handle_syscall, rpc);
452 break;
454 case RUMPRPC_SYSCALL_RESP:
455 /* assert client */
457 struct sysproxy_qent *qent;
459 mutex_enter(&recvmtx);
460 TAILQ_FOREACH(qent, &recvq, entries)
461 if (qent->reqno == rpch.rpch_reqno)
462 break;
463 if (!qent) {
464 mutex_exit(&recvmtx);
465 kmem_free(rpc, rpch.rpch_flen);
466 break;
468 TAILQ_REMOVE(&recvq, qent, entries);
469 qent->rpch_resp = (void *)rpc;
470 cv_signal(&qent->cv);
471 mutex_exit(&recvmtx);
473 break;
475 case RUMPRPC_COPYIN:
476 /* assert client */
477 wrkenqueue(handle_copyin, rpc);
478 break;
480 case RUMPRPC_COPYIN_RESP:
481 /* assert server */
483 struct sysproxy_qent *qent;
485 mutex_enter(&recvmtx);
486 TAILQ_FOREACH(qent, &recvq, entries)
487 if (qent->reqno == rpch.rpch_reqno)
488 break;
489 if (!qent) {
490 mutex_exit(&recvmtx);
491 kmem_free(rpc, rpch.rpch_flen);
492 break;
494 TAILQ_REMOVE(&recvq, qent, entries);
495 qent->rpch_resp = (void *)rpc;
496 cv_signal(&qent->cv);
497 mutex_exit(&recvmtx);
499 break;
502 case RUMPRPC_COPYOUT:
504 struct rumprpc_copydata *req = (void *)rpc;
506 memcpy(req->rpc_addr, req->rpc_data, req->rpc_len);
507 kmem_free(req, req->rpc_head.rpch_flen);
508 break;
511 default:
512 printf("invalid type %d\n", rpch.rpch_type);
518 * Make a syscall to a remote site over a socket. I'm not really sure
519 * if this should use kernel or user networking. Currently it uses
520 * user networking, but could be changed.
522 static int
523 rump_sysproxy_socket(int num, void *arg, uint8_t *data, size_t dlen,
524 register_t *retval)
526 struct sysproxy_qent *qent;
527 struct rumprpc_sysreq *call;
528 struct rumprpc_sysresp *resp;
529 size_t totlen = sizeof(*call) + dlen;
530 int s = (uintptr_t)arg;
531 int error;
533 qent = get_qent();
535 /* build request */
536 /* should we prefer multiple writes if dlen > magic_constant? */
537 call = kmem_alloc(totlen, KM_SLEEP);
538 call->rpc_head.rpch_flen = totlen;
539 call->rpc_head.rpch_reqno = qent->reqno;
540 call->rpc_head.rpch_type = RUMPRPC_SYSCALL;
541 call->rpc_sysnum = num;
542 memcpy(call->rpc_data, data, dlen);
544 dosend(s, qent, (uint8_t *)call, totlen, true);
545 kmem_free(call, totlen);
548 * Wake up next sender or just toggle availability
550 mutex_enter(&sendmtx);
551 if (TAILQ_EMPTY(&sendq)) {
552 sendavail = true;
553 } else {
554 cv_signal(&TAILQ_FIRST(&sendq)->cv);
556 mutex_exit(&sendmtx);
558 /* Wait for response */
559 mutex_enter(&recvmtx);
560 while (qent->rpch_resp == NULL)
561 cv_wait(&qent->cv, &recvmtx);
562 mutex_exit(&recvmtx);
564 resp = (struct rumprpc_sysresp *)qent->rpch_resp;
565 /* we trust our kernel */
566 KASSERT(resp->rpc_head.rpch_type == RUMPRPC_SYSCALL_RESP);
568 *retval = resp->rpc_retval;
569 error = resp->rpc_error;
571 kmem_free(resp, resp->rpc_head.rpch_flen);
572 put_qent(qent);
574 return error;
578 rump_sysproxy_socket_setup_client(int s)
580 int error;
582 error = kthread_create(PRI_NONE, KTHREAD_MPSAFE, NULL,
583 recvthread, (void *)(uintptr_t)s, NULL, "sysproxy_recv");
584 if (error)
585 return error;
587 mutex_init(&wrkmtx, MUTEX_DEFAULT, IPL_NONE);
588 mutex_init(&sendmtx, MUTEX_DEFAULT, IPL_NONE);
589 mutex_init(&recvmtx, MUTEX_DEFAULT, IPL_NONE);
590 error = rump_sysproxy_set(rump_sysproxy_socket,
591 (void *)(uintptr_t)s);
592 /* XXX: handle */
594 sock = s;
596 return error;
600 rump_sysproxy_socket_setup_server(int s)
602 int error;
604 error = kthread_create(PRI_NONE, KTHREAD_MPSAFE, NULL,
605 recvthread, (void *)(uintptr_t)s, NULL, "sysproxy_recv");
606 if (error)
607 return error;
609 mutex_init(&wrkmtx, MUTEX_DEFAULT, IPL_NONE);
610 mutex_init(&recvmtx, MUTEX_DEFAULT, IPL_NONE);
611 mutex_init(&sendmtx, MUTEX_DEFAULT, IPL_NONE);
613 sock = s;
615 return 0;