Patrick Welche <prlw1@cam.ac.uk>
[netbsd-mini2440.git] / external / bsd / libevent / dist / evrpc.c
blob5630254ed1ec2db3f3ee1b2fb542a417ff81f9a4
1 /* $NetBSD$ */
2 /*
3 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif
32 #ifdef WIN32
33 #define WIN32_LEAN_AND_MEAN
34 #include <winsock2.h>
35 #include <windows.h>
36 #undef WIN32_LEAN_AND_MEAN
37 #endif
39 #include <sys/types.h>
40 #ifndef WIN32
41 #include <sys/socket.h>
42 #endif
43 #ifdef HAVE_SYS_TIME_H
44 #include <sys/time.h>
45 #endif
46 #include <sys/queue.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #ifndef WIN32
50 #include <unistd.h>
51 #endif
52 #include <errno.h>
53 #include <signal.h>
54 #include <string.h>
55 #include <assert.h>
57 #include "event.h"
58 #include "evrpc.h"
59 #include "evrpc-internal.h"
60 #include "evhttp.h"
61 #include "evutil.h"
62 #include "log.h"
64 struct evrpc_base *
65 evrpc_init(struct evhttp *http_server)
67 struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
68 if (base == NULL)
69 return (NULL);
71 /* we rely on the tagging sub system */
72 evtag_init();
74 TAILQ_INIT(&base->registered_rpcs);
75 TAILQ_INIT(&base->input_hooks);
76 TAILQ_INIT(&base->output_hooks);
77 base->http_server = http_server;
79 return (base);
82 void
83 evrpc_free(struct evrpc_base *base)
85 struct evrpc *rpc;
86 struct evrpc_hook *hook;
88 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
89 assert(evrpc_unregister_rpc(base, rpc->uri));
91 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
92 assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
94 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
95 assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
97 free(base);
100 void *
101 evrpc_add_hook(void *vbase,
102 enum EVRPC_HOOK_TYPE hook_type,
103 int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
104 void *cb_arg)
106 struct _evrpc_hooks *base = vbase;
107 struct evrpc_hook_list *head = NULL;
108 struct evrpc_hook *hook = NULL;
109 switch (hook_type) {
110 case EVRPC_INPUT:
111 head = &base->in_hooks;
112 break;
113 case EVRPC_OUTPUT:
114 head = &base->out_hooks;
115 break;
116 default:
117 assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
120 hook = calloc(1, sizeof(struct evrpc_hook));
121 assert(hook != NULL);
123 hook->process = cb;
124 hook->process_arg = cb_arg;
125 TAILQ_INSERT_TAIL(head, hook, next);
127 return (hook);
130 static int
131 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
133 struct evrpc_hook *hook = NULL;
134 TAILQ_FOREACH(hook, head, next) {
135 if (hook == handle) {
136 TAILQ_REMOVE(head, hook, next);
137 free(hook);
138 return (1);
142 return (0);
146 * remove the hook specified by the handle
150 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
152 struct _evrpc_hooks *base = vbase;
153 struct evrpc_hook_list *head = NULL;
154 switch (hook_type) {
155 case EVRPC_INPUT:
156 head = &base->in_hooks;
157 break;
158 case EVRPC_OUTPUT:
159 head = &base->out_hooks;
160 break;
161 default:
162 assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
165 return (evrpc_remove_hook_internal(head, handle));
168 static int
169 evrpc_process_hooks(struct evrpc_hook_list *head,
170 struct evhttp_request *req, struct evbuffer *evbuf)
172 struct evrpc_hook *hook;
173 TAILQ_FOREACH(hook, head, next) {
174 if (hook->process(req, evbuf, hook->process_arg) == -1)
175 return (-1);
178 return (0);
181 static void evrpc_pool_schedule(struct evrpc_pool *pool);
182 static void evrpc_request_cb(struct evhttp_request *, void *);
183 void evrpc_request_done(struct evrpc_req_generic*);
186 * Registers a new RPC with the HTTP server. The evrpc object is expected
187 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
188 * calls this function.
191 static char *
192 evrpc_construct_uri(const char *uri)
194 char *constructed_uri;
195 int constructed_uri_len;
197 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
198 if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
199 event_err(1, "%s: failed to register rpc at %s",
200 __func__, uri);
201 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
202 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
203 constructed_uri[constructed_uri_len - 1] = '\0';
205 return (constructed_uri);
209 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
210 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
212 char *constructed_uri = evrpc_construct_uri(rpc->uri);
214 rpc->base = base;
215 rpc->cb = cb;
216 rpc->cb_arg = cb_arg;
218 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
220 evhttp_set_cb(base->http_server,
221 constructed_uri,
222 evrpc_request_cb,
223 rpc);
225 free(constructed_uri);
227 return (0);
231 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
233 char *registered_uri = NULL;
234 struct evrpc *rpc;
236 /* find the right rpc; linear search might be slow */
237 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
238 if (strcmp(rpc->uri, name) == 0)
239 break;
241 if (rpc == NULL) {
242 /* We did not find an RPC with this name */
243 return (-1);
245 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
247 free((char *)rpc->uri);
248 free(rpc);
250 registered_uri = evrpc_construct_uri(name);
252 /* remove the http server callback */
253 assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
255 free(registered_uri);
256 return (0);
259 static void
260 evrpc_request_cb(struct evhttp_request *req, void *arg)
262 struct evrpc *rpc = arg;
263 struct evrpc_req_generic *rpc_state = NULL;
265 /* let's verify the outside parameters */
266 if (req->type != EVHTTP_REQ_POST ||
267 EVBUFFER_LENGTH(req->input_buffer) <= 0)
268 goto error;
271 * we might want to allow hooks to suspend the processing,
272 * but at the moment, we assume that they just act as simple
273 * filters.
275 if (evrpc_process_hooks(&rpc->base->input_hooks,
276 req, req->input_buffer) == -1)
277 goto error;
279 rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
280 if (rpc_state == NULL)
281 goto error;
283 /* let's check that we can parse the request */
284 rpc_state->request = rpc->request_new();
285 if (rpc_state->request == NULL)
286 goto error;
288 rpc_state->rpc = rpc;
290 if (rpc->request_unmarshal(
291 rpc_state->request, req->input_buffer) == -1) {
292 /* we failed to parse the request; that's a bummer */
293 goto error;
296 /* at this point, we have a well formed request, prepare the reply */
298 rpc_state->reply = rpc->reply_new();
299 if (rpc_state->reply == NULL)
300 goto error;
302 rpc_state->http_req = req;
303 rpc_state->done = evrpc_request_done;
305 /* give the rpc to the user; they can deal with it */
306 rpc->cb(rpc_state, rpc->cb_arg);
308 return;
310 error:
311 evrpc_reqstate_free(rpc_state);
312 evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
313 return;
316 void
317 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
319 /* clean up all memory */
320 if (rpc_state != NULL) {
321 struct evrpc *rpc = rpc_state->rpc;
323 if (rpc_state->request != NULL)
324 rpc->request_free(rpc_state->request);
325 if (rpc_state->reply != NULL)
326 rpc->reply_free(rpc_state->reply);
327 free(rpc_state);
331 void
332 evrpc_request_done(struct evrpc_req_generic* rpc_state)
334 struct evhttp_request *req = rpc_state->http_req;
335 struct evrpc *rpc = rpc_state->rpc;
336 struct evbuffer* data = NULL;
338 if (rpc->reply_complete(rpc_state->reply) == -1) {
339 /* the reply was not completely filled in. error out */
340 goto error;
343 if ((data = evbuffer_new()) == NULL) {
344 /* out of memory */
345 goto error;
348 /* serialize the reply */
349 rpc->reply_marshal(data, rpc_state->reply);
351 /* do hook based tweaks to the request */
352 if (evrpc_process_hooks(&rpc->base->output_hooks,
353 req, data) == -1)
354 goto error;
356 /* on success, we are going to transmit marshaled binary data */
357 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
358 evhttp_add_header(req->output_headers,
359 "Content-Type", "application/octet-stream");
362 evhttp_send_reply(req, HTTP_OK, "OK", data);
364 evbuffer_free(data);
366 evrpc_reqstate_free(rpc_state);
368 return;
370 error:
371 if (data != NULL)
372 evbuffer_free(data);
373 evrpc_reqstate_free(rpc_state);
374 evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
375 return;
378 /* Client implementation of RPC site */
380 static int evrpc_schedule_request(struct evhttp_connection *connection,
381 struct evrpc_request_wrapper *ctx);
383 struct evrpc_pool *
384 evrpc_pool_new(struct event_base *base)
386 struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
387 if (pool == NULL)
388 return (NULL);
390 TAILQ_INIT(&pool->connections);
391 TAILQ_INIT(&pool->requests);
393 TAILQ_INIT(&pool->input_hooks);
394 TAILQ_INIT(&pool->output_hooks);
396 pool->base = base;
397 pool->timeout = -1;
399 return (pool);
402 static void
403 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
405 free(request->name);
406 free(request);
409 void
410 evrpc_pool_free(struct evrpc_pool *pool)
412 struct evhttp_connection *connection;
413 struct evrpc_request_wrapper *request;
414 struct evrpc_hook *hook;
416 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
417 TAILQ_REMOVE(&pool->requests, request, next);
418 /* if this gets more complicated we need our own function */
419 evrpc_request_wrapper_free(request);
422 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
423 TAILQ_REMOVE(&pool->connections, connection, next);
424 evhttp_connection_free(connection);
427 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
428 assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
431 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
432 assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
435 free(pool);
439 * Add a connection to the RPC pool. A request scheduled on the pool
440 * may use any available connection.
443 void
444 evrpc_pool_add_connection(struct evrpc_pool *pool,
445 struct evhttp_connection *connection) {
446 assert(connection->http_server == NULL);
447 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
450 * associate an event base with this connection
452 if (pool->base != NULL)
453 evhttp_connection_set_base(connection, pool->base);
456 * unless a timeout was specifically set for a connection,
457 * the connection inherits the timeout from the pool.
459 if (connection->timeout == -1)
460 connection->timeout = pool->timeout;
463 * if we have any requests pending, schedule them with the new
464 * connections.
467 if (TAILQ_FIRST(&pool->requests) != NULL) {
468 struct evrpc_request_wrapper *request =
469 TAILQ_FIRST(&pool->requests);
470 TAILQ_REMOVE(&pool->requests, request, next);
471 evrpc_schedule_request(connection, request);
475 void
476 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
478 struct evhttp_connection *evcon;
479 TAILQ_FOREACH(evcon, &pool->connections, next) {
480 evcon->timeout = timeout_in_secs;
482 pool->timeout = timeout_in_secs;
486 static void evrpc_reply_done(struct evhttp_request *, void *);
487 static void evrpc_request_timeout(int, short, void *);
490 * Finds a connection object associated with the pool that is currently
491 * idle and can be used to make a request.
493 static struct evhttp_connection *
494 evrpc_pool_find_connection(struct evrpc_pool *pool)
496 struct evhttp_connection *connection;
497 TAILQ_FOREACH(connection, &pool->connections, next) {
498 if (TAILQ_FIRST(&connection->requests) == NULL)
499 return (connection);
502 return (NULL);
506 * We assume that the ctx is no longer queued on the pool.
508 static int
509 evrpc_schedule_request(struct evhttp_connection *connection,
510 struct evrpc_request_wrapper *ctx)
512 struct evhttp_request *req = NULL;
513 struct evrpc_pool *pool = ctx->pool;
514 struct evrpc_status status;
515 char *uri = NULL;
516 int res = 0;
518 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
519 goto error;
521 /* serialize the request data into the output buffer */
522 ctx->request_marshal(req->output_buffer, ctx->request);
524 uri = evrpc_construct_uri(ctx->name);
525 if (uri == NULL)
526 goto error;
528 /* we need to know the connection that we might have to abort */
529 ctx->evcon = connection;
531 /* apply hooks to the outgoing request */
532 if (evrpc_process_hooks(&pool->output_hooks,
533 req, req->output_buffer) == -1)
534 goto error;
536 if (pool->timeout > 0) {
538 * a timeout after which the whole rpc is going to be aborted.
540 struct timeval tv;
541 evutil_timerclear(&tv);
542 tv.tv_sec = pool->timeout;
543 evtimer_add(&ctx->ev_timeout, &tv);
546 /* start the request over the connection */
547 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
548 free(uri);
550 if (res == -1)
551 goto error;
553 return (0);
555 error:
556 memset(&status, 0, sizeof(status));
557 status.error = EVRPC_STATUS_ERR_UNSTARTED;
558 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
559 evrpc_request_wrapper_free(ctx);
560 return (-1);
564 evrpc_make_request(struct evrpc_request_wrapper *ctx)
566 struct evrpc_pool *pool = ctx->pool;
568 /* initialize the event structure for this rpc */
569 evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
570 if (pool->base != NULL)
571 event_base_set(pool->base, &ctx->ev_timeout);
573 /* we better have some available connections on the pool */
574 assert(TAILQ_FIRST(&pool->connections) != NULL);
577 * if no connection is available, we queue the request on the pool,
578 * the next time a connection is empty, the rpc will be send on that.
580 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
582 evrpc_pool_schedule(pool);
584 return (0);
587 static void
588 evrpc_reply_done(struct evhttp_request *req, void *arg)
590 struct evrpc_request_wrapper *ctx = arg;
591 struct evrpc_pool *pool = ctx->pool;
592 struct evrpc_status status;
593 int res = -1;
595 /* cancel any timeout we might have scheduled */
596 event_del(&ctx->ev_timeout);
598 memset(&status, 0, sizeof(status));
599 status.http_req = req;
601 /* we need to get the reply now */
602 if (req != NULL) {
603 /* apply hooks to the incoming request */
604 if (evrpc_process_hooks(&pool->input_hooks,
605 req, req->input_buffer) == -1) {
606 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
607 res = -1;
608 } else {
609 res = ctx->reply_unmarshal(ctx->reply,
610 req->input_buffer);
611 if (res == -1) {
612 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
615 } else {
616 status.error = EVRPC_STATUS_ERR_TIMEOUT;
619 if (res == -1) {
620 /* clear everything that we might have written previously */
621 ctx->reply_clear(ctx->reply);
624 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
626 evrpc_request_wrapper_free(ctx);
628 /* the http layer owns the request structure */
630 /* see if we can schedule another request */
631 evrpc_pool_schedule(pool);
634 static void
635 evrpc_pool_schedule(struct evrpc_pool *pool)
637 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
638 struct evhttp_connection *evcon;
640 /* if no requests are pending, we have no work */
641 if (ctx == NULL)
642 return;
644 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
645 TAILQ_REMOVE(&pool->requests, ctx, next);
646 evrpc_schedule_request(evcon, ctx);
650 static void
651 evrpc_request_timeout(int fd, short what, void *arg)
653 struct evrpc_request_wrapper *ctx = arg;
654 struct evhttp_connection *evcon = ctx->evcon;
655 assert(evcon != NULL);
657 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);