Drop main() prototype. Syncs with NetBSD-8
[minix.git] / external / bsd / libevent / dist / evrpc.c
blob11b84f54c371854294d98fc96eebaaa3ed141393
1 /* $NetBSD: evrpc.c,v 1.3 2015/01/29 07:26:02 spz Exp $ */
2 /*
3 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include "event2/event-config.h"
29 #include <sys/cdefs.h>
30 __RCSID("$NetBSD: evrpc.c,v 1.3 2015/01/29 07:26:02 spz Exp $");
32 #ifdef WIN32
33 #define WIN32_LEAN_AND_MEAN
34 #include <winsock2.h>
35 #include <windows.h>
36 #undef WIN32_LEAN_AND_MEAN
37 #endif
39 #include <sys/types.h>
40 #ifndef WIN32
41 #include <sys/socket.h>
42 #endif
43 #ifdef _EVENT_HAVE_SYS_TIME_H
44 #include <sys/time.h>
45 #endif
46 #include <sys/queue.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #ifndef WIN32
50 #include <unistd.h>
51 #endif
52 #include <errno.h>
53 #include <signal.h>
54 #include <string.h>
56 #include <sys/queue.h>
58 #include "event2/event.h"
59 #include "event2/event_struct.h"
60 #include "event2/rpc.h"
61 #include "event2/rpc_struct.h"
62 #include "evrpc-internal.h"
63 #include "event2/http.h"
64 #include "event2/buffer.h"
65 #include "event2/tag.h"
66 #include "event2/http_struct.h"
67 #include "event2/http_compat.h"
68 #include "event2/util.h"
69 #include "util-internal.h"
70 #include "log-internal.h"
71 #include "mm-internal.h"
73 struct evrpc_base *
74 evrpc_init(struct evhttp *http_server)
76 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
77 if (base == NULL)
78 return (NULL);
80 /* we rely on the tagging sub system */
81 evtag_init();
83 TAILQ_INIT(&base->registered_rpcs);
84 TAILQ_INIT(&base->input_hooks);
85 TAILQ_INIT(&base->output_hooks);
87 TAILQ_INIT(&base->paused_requests);
89 base->http_server = http_server;
91 return (base);
94 void
95 evrpc_free(struct evrpc_base *base)
97 struct evrpc *rpc;
98 struct evrpc_hook *hook;
99 struct evrpc_hook_ctx *pause;
100 int r;
102 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
103 r = evrpc_unregister_rpc(base, rpc->uri);
104 EVUTIL_ASSERT(r == 0);
106 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
107 TAILQ_REMOVE(&base->paused_requests, pause, next);
108 mm_free(pause);
110 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
111 r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
112 EVUTIL_ASSERT(r);
114 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
115 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
116 EVUTIL_ASSERT(r);
118 mm_free(base);
121 void *
122 evrpc_add_hook(void *vbase,
123 enum EVRPC_HOOK_TYPE hook_type,
124 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
125 void *cb_arg)
127 struct _evrpc_hooks *base = vbase;
128 struct evrpc_hook_list *head = NULL;
129 struct evrpc_hook *hook = NULL;
130 switch (hook_type) {
131 case EVRPC_INPUT:
132 head = &base->in_hooks;
133 break;
134 case EVRPC_OUTPUT:
135 head = &base->out_hooks;
136 break;
137 default:
138 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
141 hook = mm_calloc(1, sizeof(struct evrpc_hook));
142 EVUTIL_ASSERT(hook != NULL);
144 hook->process = cb;
145 hook->process_arg = cb_arg;
146 TAILQ_INSERT_TAIL(head, hook, next);
148 return (hook);
151 static int
152 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
154 struct evrpc_hook *hook = NULL;
155 TAILQ_FOREACH(hook, head, next) {
156 if (hook == handle) {
157 TAILQ_REMOVE(head, hook, next);
158 mm_free(hook);
159 return (1);
163 return (0);
167 * remove the hook specified by the handle
171 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
173 struct _evrpc_hooks *base = vbase;
174 struct evrpc_hook_list *head = NULL;
175 switch (hook_type) {
176 case EVRPC_INPUT:
177 head = &base->in_hooks;
178 break;
179 case EVRPC_OUTPUT:
180 head = &base->out_hooks;
181 break;
182 default:
183 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
186 return (evrpc_remove_hook_internal(head, handle));
189 static int
190 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
191 struct evhttp_request *req, struct evbuffer *evbuf)
193 struct evrpc_hook *hook;
194 TAILQ_FOREACH(hook, head, next) {
195 int res = hook->process(ctx, req, evbuf, hook->process_arg);
196 if (res != EVRPC_CONTINUE)
197 return (res);
200 return (EVRPC_CONTINUE);
203 static void evrpc_pool_schedule(struct evrpc_pool *pool);
204 static void evrpc_request_cb(struct evhttp_request *, void *);
207 * Registers a new RPC with the HTTP server. The evrpc object is expected
208 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
209 * calls this function.
212 static char *
213 evrpc_construct_uri(const char *uri)
215 char *constructed_uri;
216 size_t constructed_uri_len;
218 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
219 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
220 event_err(1, "%s: failed to register rpc at %s",
221 __func__, uri);
222 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
223 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
224 constructed_uri[constructed_uri_len - 1] = '\0';
226 return (constructed_uri);
230 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
231 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
233 char *constructed_uri = evrpc_construct_uri(rpc->uri);
235 rpc->base = base;
236 rpc->cb = cb;
237 rpc->cb_arg = cb_arg;
239 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
241 evhttp_set_cb(base->http_server,
242 constructed_uri,
243 evrpc_request_cb,
244 rpc);
246 mm_free(constructed_uri);
248 return (0);
252 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
254 char *registered_uri = NULL;
255 struct evrpc *rpc;
256 int r;
258 /* find the right rpc; linear search might be slow */
259 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
260 if (strcmp(rpc->uri, name) == 0)
261 break;
263 if (rpc == NULL) {
264 /* We did not find an RPC with this name */
265 return (-1);
267 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
269 registered_uri = evrpc_construct_uri(name);
271 /* remove the http server callback */
272 r = evhttp_del_cb(base->http_server, registered_uri);
273 EVUTIL_ASSERT(r == 0);
275 mm_free(registered_uri);
277 mm_free(__UNCONST(rpc->uri));
278 mm_free(rpc);
279 return (0);
282 static int evrpc_pause_request(void *vbase, void *ctx,
283 void (*cb)(void *, enum EVRPC_HOOK_RESULT));
284 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
286 static void
287 evrpc_request_cb(struct evhttp_request *req, void *arg)
289 struct evrpc *rpc = arg;
290 struct evrpc_req_generic *rpc_state = NULL;
292 /* let's verify the outside parameters */
293 if (req->type != EVHTTP_REQ_POST ||
294 evbuffer_get_length(req->input_buffer) <= 0)
295 goto error;
297 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
298 if (rpc_state == NULL)
299 goto error;
300 rpc_state->rpc = rpc;
301 rpc_state->http_req = req;
302 rpc_state->rpc_data = NULL;
304 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
305 int hook_res;
307 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
310 * allow hooks to modify the outgoing request
312 hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
313 rpc_state, req, req->input_buffer);
314 switch (hook_res) {
315 case EVRPC_TERMINATE:
316 goto error;
317 case EVRPC_PAUSE:
318 evrpc_pause_request(rpc->base, rpc_state,
319 evrpc_request_cb_closure);
320 return;
321 case EVRPC_CONTINUE:
322 break;
323 default:
324 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
325 hook_res == EVRPC_CONTINUE ||
326 hook_res == EVRPC_PAUSE);
330 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
331 return;
333 error:
334 if (rpc_state != NULL)
335 evrpc_reqstate_free(rpc_state);
336 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
337 return;
340 static void
341 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
343 struct evrpc_req_generic *rpc_state = arg;
344 struct evrpc *rpc;
345 struct evhttp_request *req;
347 EVUTIL_ASSERT(rpc_state);
348 rpc = rpc_state->rpc;
349 req = rpc_state->http_req;
351 if (hook_res == EVRPC_TERMINATE)
352 goto error;
354 /* let's check that we can parse the request */
355 rpc_state->request = rpc->request_new(rpc->request_new_arg);
356 if (rpc_state->request == NULL)
357 goto error;
359 if (rpc->request_unmarshal(
360 rpc_state->request, req->input_buffer) == -1) {
361 /* we failed to parse the request; that's a bummer */
362 goto error;
365 /* at this point, we have a well formed request, prepare the reply */
367 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
368 if (rpc_state->reply == NULL)
369 goto error;
371 /* give the rpc to the user; they can deal with it */
372 rpc->cb(rpc_state, rpc->cb_arg);
374 return;
376 error:
377 if (rpc_state != NULL)
378 evrpc_reqstate_free(rpc_state);
379 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
380 return;
384 void
385 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
387 struct evrpc *rpc;
388 EVUTIL_ASSERT(rpc_state != NULL);
389 rpc = rpc_state->rpc;
391 /* clean up all memory */
392 if (rpc_state->hook_meta != NULL)
393 evrpc_hook_context_free(rpc_state->hook_meta);
394 if (rpc_state->request != NULL)
395 rpc->request_free(rpc_state->request);
396 if (rpc_state->reply != NULL)
397 rpc->reply_free(rpc_state->reply);
398 if (rpc_state->rpc_data != NULL)
399 evbuffer_free(rpc_state->rpc_data);
400 mm_free(rpc_state);
403 static void
404 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
406 void
407 evrpc_request_done(struct evrpc_req_generic *rpc_state)
409 struct evhttp_request *req;
410 struct evrpc *rpc;
412 EVUTIL_ASSERT(rpc_state);
414 req = rpc_state->http_req;
415 rpc = rpc_state->rpc;
417 if (rpc->reply_complete(rpc_state->reply) == -1) {
418 /* the reply was not completely filled in. error out */
419 goto error;
422 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
423 /* out of memory */
424 goto error;
427 /* serialize the reply */
428 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
430 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
431 int hook_res;
433 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
435 /* do hook based tweaks to the request */
436 hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
437 rpc_state, req, rpc_state->rpc_data);
438 switch (hook_res) {
439 case EVRPC_TERMINATE:
440 goto error;
441 case EVRPC_PAUSE:
442 if (evrpc_pause_request(rpc->base, rpc_state,
443 evrpc_request_done_closure) == -1)
444 goto error;
445 return;
446 case EVRPC_CONTINUE:
447 break;
448 default:
449 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
450 hook_res == EVRPC_CONTINUE ||
451 hook_res == EVRPC_PAUSE);
455 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
456 return;
458 error:
459 if (rpc_state != NULL)
460 evrpc_reqstate_free(rpc_state);
461 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
462 return;
465 void *
466 evrpc_get_request(struct evrpc_req_generic *req)
468 return req->request;
471 void *
472 evrpc_get_reply(struct evrpc_req_generic *req)
474 return req->reply;
477 static void
478 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
480 struct evrpc_req_generic *rpc_state = arg;
481 struct evhttp_request *req;
482 EVUTIL_ASSERT(rpc_state);
483 req = rpc_state->http_req;
485 if (hook_res == EVRPC_TERMINATE)
486 goto error;
488 /* on success, we are going to transmit marshaled binary data */
489 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
490 evhttp_add_header(req->output_headers,
491 "Content-Type", "application/octet-stream");
493 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
495 evrpc_reqstate_free(rpc_state);
497 return;
499 error:
500 if (rpc_state != NULL)
501 evrpc_reqstate_free(rpc_state);
502 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
503 return;
507 /* Client implementation of RPC site */
509 static int evrpc_schedule_request(struct evhttp_connection *connection,
510 struct evrpc_request_wrapper *ctx);
512 struct evrpc_pool *
513 evrpc_pool_new(struct event_base *base)
515 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
516 if (pool == NULL)
517 return (NULL);
519 TAILQ_INIT(&pool->connections);
520 TAILQ_INIT(&pool->requests);
522 TAILQ_INIT(&pool->paused_requests);
524 TAILQ_INIT(&pool->input_hooks);
525 TAILQ_INIT(&pool->output_hooks);
527 pool->base = base;
528 pool->timeout = -1;
530 return (pool);
533 static void
534 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
536 if (request->hook_meta != NULL)
537 evrpc_hook_context_free(request->hook_meta);
538 mm_free(request->name);
539 mm_free(request);
542 void
543 evrpc_pool_free(struct evrpc_pool *pool)
545 struct evhttp_connection *connection;
546 struct evrpc_request_wrapper *request;
547 struct evrpc_hook_ctx *pause;
548 struct evrpc_hook *hook;
549 int r;
551 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
552 TAILQ_REMOVE(&pool->requests, request, next);
553 evrpc_request_wrapper_free(request);
556 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
557 TAILQ_REMOVE(&pool->paused_requests, pause, next);
558 mm_free(pause);
561 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
562 TAILQ_REMOVE(&pool->connections, connection, next);
563 evhttp_connection_free(connection);
566 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
567 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
568 EVUTIL_ASSERT(r);
571 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
572 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
573 EVUTIL_ASSERT(r);
576 mm_free(pool);
580 * Add a connection to the RPC pool. A request scheduled on the pool
581 * may use any available connection.
584 void
585 evrpc_pool_add_connection(struct evrpc_pool *pool,
586 struct evhttp_connection *connection)
588 EVUTIL_ASSERT(connection->http_server == NULL);
589 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
592 * associate an event base with this connection
594 if (pool->base != NULL)
595 evhttp_connection_set_base(connection, pool->base);
598 * unless a timeout was specifically set for a connection,
599 * the connection inherits the timeout from the pool.
601 if (connection->timeout == -1)
602 connection->timeout = pool->timeout;
605 * if we have any requests pending, schedule them with the new
606 * connections.
609 if (TAILQ_FIRST(&pool->requests) != NULL) {
610 struct evrpc_request_wrapper *request =
611 TAILQ_FIRST(&pool->requests);
612 TAILQ_REMOVE(&pool->requests, request, next);
613 evrpc_schedule_request(connection, request);
617 void
618 evrpc_pool_remove_connection(struct evrpc_pool *pool,
619 struct evhttp_connection *connection)
621 TAILQ_REMOVE(&pool->connections, connection, next);
624 void
625 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
627 struct evhttp_connection *evcon;
628 TAILQ_FOREACH(evcon, &pool->connections, next) {
629 evcon->timeout = timeout_in_secs;
631 pool->timeout = timeout_in_secs;
635 static void evrpc_reply_done(struct evhttp_request *, void *);
636 static void evrpc_request_timeout(evutil_socket_t, short, void *);
639 * Finds a connection object associated with the pool that is currently
640 * idle and can be used to make a request.
642 static struct evhttp_connection *
643 evrpc_pool_find_connection(struct evrpc_pool *pool)
645 struct evhttp_connection *connection;
646 TAILQ_FOREACH(connection, &pool->connections, next) {
647 if (TAILQ_FIRST(&connection->requests) == NULL)
648 return (connection);
651 return (NULL);
655 * Prototypes responsible for evrpc scheduling and hooking
658 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
661 * We assume that the ctx is no longer queued on the pool.
663 static int
664 evrpc_schedule_request(struct evhttp_connection *connection,
665 struct evrpc_request_wrapper *ctx)
667 struct evhttp_request *req = NULL;
668 struct evrpc_pool *pool = ctx->pool;
669 struct evrpc_status status;
671 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
672 goto error;
674 /* serialize the request data into the output buffer */
675 ctx->request_marshal(req->output_buffer, ctx->request);
677 /* we need to know the connection that we might have to abort */
678 ctx->evcon = connection;
680 /* if we get paused we also need to know the request */
681 ctx->req = req;
683 if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
684 int hook_res;
686 evrpc_hook_associate_meta(&ctx->hook_meta, connection);
688 /* apply hooks to the outgoing request */
689 hook_res = evrpc_process_hooks(&pool->output_hooks,
690 ctx, req, req->output_buffer);
692 switch (hook_res) {
693 case EVRPC_TERMINATE:
694 goto error;
695 case EVRPC_PAUSE:
696 /* we need to be explicitly resumed */
697 if (evrpc_pause_request(pool, ctx,
698 evrpc_schedule_request_closure) == -1)
699 goto error;
700 return (0);
701 case EVRPC_CONTINUE:
702 /* we can just continue */
703 break;
704 default:
705 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
706 hook_res == EVRPC_CONTINUE ||
707 hook_res == EVRPC_PAUSE);
711 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
712 return (0);
714 error:
715 memset(&status, 0, sizeof(status));
716 status.error = EVRPC_STATUS_ERR_UNSTARTED;
717 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
718 evrpc_request_wrapper_free(ctx);
719 return (-1);
722 static void
723 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
725 struct evrpc_request_wrapper *ctx = arg;
726 struct evhttp_connection *connection = ctx->evcon;
727 struct evhttp_request *req = ctx->req;
728 struct evrpc_pool *pool = ctx->pool;
729 struct evrpc_status status;
730 char *uri = NULL;
731 int res = 0;
733 if (hook_res == EVRPC_TERMINATE)
734 goto error;
736 uri = evrpc_construct_uri(ctx->name);
737 if (uri == NULL)
738 goto error;
740 if (pool->timeout > 0) {
742 * a timeout after which the whole rpc is going to be aborted.
744 struct timeval tv;
745 evutil_timerclear(&tv);
746 tv.tv_sec = pool->timeout;
747 evtimer_add(&ctx->ev_timeout, &tv);
750 /* start the request over the connection */
751 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
752 mm_free(uri);
754 if (res == -1)
755 goto error;
757 return;
759 error:
760 memset(&status, 0, sizeof(status));
761 status.error = EVRPC_STATUS_ERR_UNSTARTED;
762 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
763 evrpc_request_wrapper_free(ctx);
766 /* we just queue the paused request on the pool under the req object */
767 static int
768 evrpc_pause_request(void *vbase, void *ctx,
769 void (*cb)(void *, enum EVRPC_HOOK_RESULT))
771 struct _evrpc_hooks *base = vbase;
772 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
773 if (pause == NULL)
774 return (-1);
776 pause->ctx = ctx;
777 pause->cb = cb;
779 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
780 return (0);
784 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
786 struct _evrpc_hooks *base = vbase;
787 struct evrpc_pause_list *head = &base->pause_requests;
788 struct evrpc_hook_ctx *pause;
790 TAILQ_FOREACH(pause, head, next) {
791 if (pause->ctx == ctx)
792 break;
795 if (pause == NULL)
796 return (-1);
798 (*pause->cb)(pause->ctx, res);
799 TAILQ_REMOVE(head, pause, next);
800 mm_free(pause);
801 return (0);
805 evrpc_make_request(struct evrpc_request_wrapper *ctx)
807 struct evrpc_pool *pool = ctx->pool;
809 /* initialize the event structure for this rpc */
810 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
812 /* we better have some available connections on the pool */
813 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
816 * if no connection is available, we queue the request on the pool,
817 * the next time a connection is empty, the rpc will be send on that.
819 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
821 evrpc_pool_schedule(pool);
823 return (0);
827 struct evrpc_request_wrapper *
828 evrpc_make_request_ctx(
829 struct evrpc_pool *pool, void *request, void *reply,
830 const char *rpcname,
831 void (*req_marshal)(struct evbuffer*, void *),
832 void (*rpl_clear)(void *),
833 int (*rpl_unmarshal)(void *, struct evbuffer *),
834 void (*cb)(struct evrpc_status *, void *, void *, void *),
835 void *cbarg)
837 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
838 mm_malloc(sizeof(struct evrpc_request_wrapper));
839 if (ctx == NULL)
840 return (NULL);
842 ctx->pool = pool;
843 ctx->hook_meta = NULL;
844 ctx->evcon = NULL;
845 ctx->name = mm_strdup(rpcname);
846 if (ctx->name == NULL) {
847 mm_free(ctx);
848 return (NULL);
850 ctx->cb = cb;
851 ctx->cb_arg = cbarg;
852 ctx->request = request;
853 ctx->reply = reply;
854 ctx->request_marshal = req_marshal;
855 ctx->reply_clear = rpl_clear;
856 ctx->reply_unmarshal = rpl_unmarshal;
858 return (ctx);
861 static void
862 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
864 static void
865 evrpc_reply_done(struct evhttp_request *req, void *arg)
867 struct evrpc_request_wrapper *ctx = arg;
868 struct evrpc_pool *pool = ctx->pool;
869 int hook_res = EVRPC_CONTINUE;
871 /* cancel any timeout we might have scheduled */
872 event_del(&ctx->ev_timeout);
874 ctx->req = req;
876 /* we need to get the reply now */
877 if (req == NULL) {
878 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
879 return;
882 if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
883 evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
885 /* apply hooks to the incoming request */
886 hook_res = evrpc_process_hooks(&pool->input_hooks,
887 ctx, req, req->input_buffer);
889 switch (hook_res) {
890 case EVRPC_TERMINATE:
891 case EVRPC_CONTINUE:
892 break;
893 case EVRPC_PAUSE:
895 * if we get paused we also need to know the
896 * request. unfortunately, the underlying
897 * layer is going to free it. we need to
898 * request ownership explicitly
900 if (req != NULL)
901 evhttp_request_own(req);
903 evrpc_pause_request(pool, ctx,
904 evrpc_reply_done_closure);
905 return;
906 default:
907 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
908 hook_res == EVRPC_CONTINUE ||
909 hook_res == EVRPC_PAUSE);
913 evrpc_reply_done_closure(ctx, hook_res);
915 /* http request is being freed by underlying layer */
918 static void
919 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
921 struct evrpc_request_wrapper *ctx = arg;
922 struct evhttp_request *req = ctx->req;
923 struct evrpc_pool *pool = ctx->pool;
924 struct evrpc_status status;
925 int res = -1;
927 memset(&status, 0, sizeof(status));
928 status.http_req = req;
930 /* we need to get the reply now */
931 if (req == NULL) {
932 status.error = EVRPC_STATUS_ERR_TIMEOUT;
933 } else if (hook_res == EVRPC_TERMINATE) {
934 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
935 } else {
936 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
937 if (res == -1)
938 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
941 if (res == -1) {
942 /* clear everything that we might have written previously */
943 ctx->reply_clear(ctx->reply);
946 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
948 evrpc_request_wrapper_free(ctx);
950 /* the http layer owned the original request structure, but if we
951 * got paused, we asked for ownership and need to free it here. */
952 if (req != NULL && evhttp_request_is_owned(req))
953 evhttp_request_free(req);
955 /* see if we can schedule another request */
956 evrpc_pool_schedule(pool);
959 static void
960 evrpc_pool_schedule(struct evrpc_pool *pool)
962 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
963 struct evhttp_connection *evcon;
965 /* if no requests are pending, we have no work */
966 if (ctx == NULL)
967 return;
969 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
970 TAILQ_REMOVE(&pool->requests, ctx, next);
971 evrpc_schedule_request(evcon, ctx);
975 static void
976 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
978 struct evrpc_request_wrapper *ctx = arg;
979 struct evhttp_connection *evcon = ctx->evcon;
980 EVUTIL_ASSERT(evcon != NULL);
982 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
986 * frees potential meta data associated with a request.
989 static void
990 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
992 struct evrpc_meta *entry;
993 EVUTIL_ASSERT(meta_data != NULL);
995 while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
996 TAILQ_REMOVE(meta_data, entry, next);
997 mm_free(entry->key);
998 mm_free(entry->data);
999 mm_free(entry);
1003 static struct evrpc_hook_meta *
1004 evrpc_hook_meta_new(void)
1006 struct evrpc_hook_meta *ctx;
1007 ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1008 EVUTIL_ASSERT(ctx != NULL);
1010 TAILQ_INIT(&ctx->meta_data);
1011 ctx->evcon = NULL;
1013 return (ctx);
1016 static void
1017 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
1018 struct evhttp_connection *evcon)
1020 struct evrpc_hook_meta *ctx = *pctx;
1021 if (ctx == NULL)
1022 *pctx = ctx = evrpc_hook_meta_new();
1023 ctx->evcon = evcon;
1026 static void
1027 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
1029 evrpc_meta_data_free(&ctx->meta_data);
1030 mm_free(ctx);
1033 /* Adds meta data */
1034 void
1035 evrpc_hook_add_meta(void *ctx, const char *key,
1036 const void *data, size_t data_size)
1038 struct evrpc_request_wrapper *req = ctx;
1039 struct evrpc_hook_meta *store = NULL;
1040 struct evrpc_meta *meta = NULL;
1042 if ((store = req->hook_meta) == NULL)
1043 store = req->hook_meta = evrpc_hook_meta_new();
1045 meta = mm_malloc(sizeof(struct evrpc_meta));
1046 EVUTIL_ASSERT(meta != NULL);
1047 meta->key = mm_strdup(key);
1048 EVUTIL_ASSERT(meta->key != NULL);
1049 meta->data_size = data_size;
1050 meta->data = mm_malloc(data_size);
1051 EVUTIL_ASSERT(meta->data != NULL);
1052 memcpy(meta->data, data, data_size);
1054 TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1058 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1060 struct evrpc_request_wrapper *req = ctx;
1061 struct evrpc_meta *meta = NULL;
1063 if (req->hook_meta == NULL)
1064 return (-1);
1066 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1067 if (strcmp(meta->key, key) == 0) {
1068 *data = meta->data;
1069 *data_size = meta->data_size;
1070 return (0);
1074 return (-1);
1077 struct evhttp_connection *
1078 evrpc_hook_get_connection(void *ctx)
1080 struct evrpc_request_wrapper *req = ctx;
1081 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1085 evrpc_send_request_generic(struct evrpc_pool *pool,
1086 void *request, void *reply,
1087 void (*cb)(struct evrpc_status *, void *, void *, void *),
1088 void *cb_arg,
1089 const char *rpcname,
1090 void (*req_marshal)(struct evbuffer *, void *),
1091 void (*rpl_clear)(void *),
1092 int (*rpl_unmarshal)(void *, struct evbuffer *))
1094 struct evrpc_status status;
1095 struct evrpc_request_wrapper *ctx;
1096 ctx = evrpc_make_request_ctx(pool, request, reply,
1097 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1098 if (ctx == NULL)
1099 goto error;
1100 return (evrpc_make_request(ctx));
1101 error:
1102 memset(&status, 0, sizeof(status));
1103 status.error = EVRPC_STATUS_ERR_UNSTARTED;
1104 (*(cb))(&status, request, reply, cb_arg);
1105 return (-1);
1108 /** Takes a request object and fills it in with the right magic */
1109 static struct evrpc *
1110 evrpc_register_object(const char *name,
1111 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1112 int (*req_unmarshal)(void *, struct evbuffer *),
1113 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1114 int (*rpl_complete)(void *),
1115 void (*rpl_marshal)(struct evbuffer *, void *))
1117 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1118 if (rpc == NULL)
1119 return (NULL);
1120 rpc->uri = mm_strdup(name);
1121 if (rpc->uri == NULL) {
1122 mm_free(rpc);
1123 return (NULL);
1125 rpc->request_new = req_new;
1126 rpc->request_new_arg = req_new_arg;
1127 rpc->request_free = req_free;
1128 rpc->request_unmarshal = req_unmarshal;
1129 rpc->reply_new = rpl_new;
1130 rpc->reply_new_arg = rpl_new_arg;
1131 rpc->reply_free = rpl_free;
1132 rpc->reply_complete = rpl_complete;
1133 rpc->reply_marshal = rpl_marshal;
1134 return (rpc);
1138 evrpc_register_generic(struct evrpc_base *base, const char *name,
1139 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1140 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1141 int (*req_unmarshal)(void *, struct evbuffer *),
1142 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1143 int (*rpl_complete)(void *),
1144 void (*rpl_marshal)(struct evbuffer *, void *))
1146 struct evrpc* rpc =
1147 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1148 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1149 if (rpc == NULL)
1150 return (-1);
1151 evrpc_register_rpc(base, rpc,
1152 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1153 return (0);
1156 /** accessors for obscure and undocumented functionality */
1157 struct evrpc_pool *
1158 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1160 return (ctx->pool);
1163 void
1164 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1165 struct evrpc_pool *pool)
1167 ctx->pool = pool;
1170 void
1171 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1172 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1173 void *cb_arg)
1175 ctx->cb = cb;
1176 ctx->cb_arg = cb_arg;