Blink roll 25b6bd3a7a131ffe68d809546ad1a20707915cdc:3a503f41ae42e5b79cfcd2ff10e65afde...
[chromium-blink-merge.git] / third_party / libevent / evrpc.c
blob070fd9e710c575d2832259789d3db6de13d26144
1 /*
2 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
31 #ifdef WIN32
32 #define WIN32_LEAN_AND_MEAN
33 #include <winsock2.h>
34 #include <windows.h>
35 #undef WIN32_LEAN_AND_MEAN
36 #endif
38 #include <sys/types.h>
39 #ifndef WIN32
40 #include <sys/socket.h>
41 #endif
42 #ifdef HAVE_SYS_TIME_H
43 #include <sys/time.h>
44 #endif
45 #include <sys/queue.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #ifndef WIN32
49 #include <unistd.h>
50 #endif
51 #include <errno.h>
52 #include <signal.h>
53 #include <string.h>
54 #include <assert.h>
56 #include "event.h"
57 #include "evrpc.h"
58 #include "evrpc-internal.h"
59 #include "evhttp.h"
60 #include "evutil.h"
61 #include "log.h"
63 struct evrpc_base *
64 evrpc_init(struct evhttp *http_server)
66 struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
67 if (base == NULL)
68 return (NULL);
70 /* we rely on the tagging sub system */
71 evtag_init();
73 TAILQ_INIT(&base->registered_rpcs);
74 TAILQ_INIT(&base->input_hooks);
75 TAILQ_INIT(&base->output_hooks);
76 base->http_server = http_server;
78 return (base);
81 void
82 evrpc_free(struct evrpc_base *base)
84 struct evrpc *rpc;
85 struct evrpc_hook *hook;
87 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
88 assert(evrpc_unregister_rpc(base, rpc->uri));
90 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
91 assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
93 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
94 assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
96 free(base);
99 void *
100 evrpc_add_hook(void *vbase,
101 enum EVRPC_HOOK_TYPE hook_type,
102 int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
103 void *cb_arg)
105 struct _evrpc_hooks *base = vbase;
106 struct evrpc_hook_list *head = NULL;
107 struct evrpc_hook *hook = NULL;
108 switch (hook_type) {
109 case EVRPC_INPUT:
110 head = &base->in_hooks;
111 break;
112 case EVRPC_OUTPUT:
113 head = &base->out_hooks;
114 break;
115 default:
116 assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
119 hook = calloc(1, sizeof(struct evrpc_hook));
120 assert(hook != NULL);
122 hook->process = cb;
123 hook->process_arg = cb_arg;
124 TAILQ_INSERT_TAIL(head, hook, next);
126 return (hook);
129 static int
130 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
132 struct evrpc_hook *hook = NULL;
133 TAILQ_FOREACH(hook, head, next) {
134 if (hook == handle) {
135 TAILQ_REMOVE(head, hook, next);
136 free(hook);
137 return (1);
141 return (0);
145 * remove the hook specified by the handle
149 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
151 struct _evrpc_hooks *base = vbase;
152 struct evrpc_hook_list *head = NULL;
153 switch (hook_type) {
154 case EVRPC_INPUT:
155 head = &base->in_hooks;
156 break;
157 case EVRPC_OUTPUT:
158 head = &base->out_hooks;
159 break;
160 default:
161 assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
164 return (evrpc_remove_hook_internal(head, handle));
167 static int
168 evrpc_process_hooks(struct evrpc_hook_list *head,
169 struct evhttp_request *req, struct evbuffer *evbuf)
171 struct evrpc_hook *hook;
172 TAILQ_FOREACH(hook, head, next) {
173 if (hook->process(req, evbuf, hook->process_arg) == -1)
174 return (-1);
177 return (0);
180 static void evrpc_pool_schedule(struct evrpc_pool *pool);
181 static void evrpc_request_cb(struct evhttp_request *, void *);
182 void evrpc_request_done(struct evrpc_req_generic*);
185 * Registers a new RPC with the HTTP server. The evrpc object is expected
186 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
187 * calls this function.
190 static char *
191 evrpc_construct_uri(const char *uri)
193 char *constructed_uri;
194 int constructed_uri_len;
196 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
197 if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
198 event_err(1, "%s: failed to register rpc at %s",
199 __func__, uri);
200 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
201 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
202 constructed_uri[constructed_uri_len - 1] = '\0';
204 return (constructed_uri);
208 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
209 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
211 char *constructed_uri = evrpc_construct_uri(rpc->uri);
213 rpc->base = base;
214 rpc->cb = cb;
215 rpc->cb_arg = cb_arg;
217 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
219 evhttp_set_cb(base->http_server,
220 constructed_uri,
221 evrpc_request_cb,
222 rpc);
224 free(constructed_uri);
226 return (0);
230 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
232 char *registered_uri = NULL;
233 struct evrpc *rpc;
235 /* find the right rpc; linear search might be slow */
236 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
237 if (strcmp(rpc->uri, name) == 0)
238 break;
240 if (rpc == NULL) {
241 /* We did not find an RPC with this name */
242 return (-1);
244 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
246 free((char *)rpc->uri);
247 free(rpc);
249 registered_uri = evrpc_construct_uri(name);
251 /* remove the http server callback */
252 assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
254 free(registered_uri);
255 return (0);
258 static void
259 evrpc_request_cb(struct evhttp_request *req, void *arg)
261 struct evrpc *rpc = arg;
262 struct evrpc_req_generic *rpc_state = NULL;
264 /* let's verify the outside parameters */
265 if (req->type != EVHTTP_REQ_POST ||
266 EVBUFFER_LENGTH(req->input_buffer) <= 0)
267 goto error;
270 * we might want to allow hooks to suspend the processing,
271 * but at the moment, we assume that they just act as simple
272 * filters.
274 if (evrpc_process_hooks(&rpc->base->input_hooks,
275 req, req->input_buffer) == -1)
276 goto error;
278 rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
279 if (rpc_state == NULL)
280 goto error;
282 /* let's check that we can parse the request */
283 rpc_state->request = rpc->request_new();
284 if (rpc_state->request == NULL)
285 goto error;
287 rpc_state->rpc = rpc;
289 if (rpc->request_unmarshal(
290 rpc_state->request, req->input_buffer) == -1) {
291 /* we failed to parse the request; that's a bummer */
292 goto error;
295 /* at this point, we have a well formed request, prepare the reply */
297 rpc_state->reply = rpc->reply_new();
298 if (rpc_state->reply == NULL)
299 goto error;
301 rpc_state->http_req = req;
302 rpc_state->done = evrpc_request_done;
304 /* give the rpc to the user; they can deal with it */
305 rpc->cb(rpc_state, rpc->cb_arg);
307 return;
309 error:
310 evrpc_reqstate_free(rpc_state);
311 evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
312 return;
315 void
316 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
318 /* clean up all memory */
319 if (rpc_state != NULL) {
320 struct evrpc *rpc = rpc_state->rpc;
322 if (rpc_state->request != NULL)
323 rpc->request_free(rpc_state->request);
324 if (rpc_state->reply != NULL)
325 rpc->reply_free(rpc_state->reply);
326 free(rpc_state);
330 void
331 evrpc_request_done(struct evrpc_req_generic* rpc_state)
333 struct evhttp_request *req = rpc_state->http_req;
334 struct evrpc *rpc = rpc_state->rpc;
335 struct evbuffer* data = NULL;
337 if (rpc->reply_complete(rpc_state->reply) == -1) {
338 /* the reply was not completely filled in. error out */
339 goto error;
342 if ((data = evbuffer_new()) == NULL) {
343 /* out of memory */
344 goto error;
347 /* serialize the reply */
348 rpc->reply_marshal(data, rpc_state->reply);
350 /* do hook based tweaks to the request */
351 if (evrpc_process_hooks(&rpc->base->output_hooks,
352 req, data) == -1)
353 goto error;
355 /* on success, we are going to transmit marshaled binary data */
356 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
357 evhttp_add_header(req->output_headers,
358 "Content-Type", "application/octet-stream");
361 evhttp_send_reply(req, HTTP_OK, "OK", data);
363 evbuffer_free(data);
365 evrpc_reqstate_free(rpc_state);
367 return;
369 error:
370 if (data != NULL)
371 evbuffer_free(data);
372 evrpc_reqstate_free(rpc_state);
373 evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
374 return;
377 /* Client implementation of RPC site */
379 static int evrpc_schedule_request(struct evhttp_connection *connection,
380 struct evrpc_request_wrapper *ctx);
382 struct evrpc_pool *
383 evrpc_pool_new(struct event_base *base)
385 struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
386 if (pool == NULL)
387 return (NULL);
389 TAILQ_INIT(&pool->connections);
390 TAILQ_INIT(&pool->requests);
392 TAILQ_INIT(&pool->input_hooks);
393 TAILQ_INIT(&pool->output_hooks);
395 pool->base = base;
396 pool->timeout = -1;
398 return (pool);
401 static void
402 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
404 free(request->name);
405 free(request);
408 void
409 evrpc_pool_free(struct evrpc_pool *pool)
411 struct evhttp_connection *connection;
412 struct evrpc_request_wrapper *request;
413 struct evrpc_hook *hook;
415 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
416 TAILQ_REMOVE(&pool->requests, request, next);
417 /* if this gets more complicated we need our own function */
418 evrpc_request_wrapper_free(request);
421 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
422 TAILQ_REMOVE(&pool->connections, connection, next);
423 evhttp_connection_free(connection);
426 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
427 assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
430 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
431 assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
434 free(pool);
438 * Add a connection to the RPC pool. A request scheduled on the pool
439 * may use any available connection.
442 void
443 evrpc_pool_add_connection(struct evrpc_pool *pool,
444 struct evhttp_connection *connection) {
445 assert(connection->http_server == NULL);
446 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
449 * associate an event base with this connection
451 if (pool->base != NULL)
452 evhttp_connection_set_base(connection, pool->base);
455 * unless a timeout was specifically set for a connection,
456 * the connection inherits the timeout from the pool.
458 if (connection->timeout == -1)
459 connection->timeout = pool->timeout;
462 * if we have any requests pending, schedule them with the new
463 * connections.
466 if (TAILQ_FIRST(&pool->requests) != NULL) {
467 struct evrpc_request_wrapper *request =
468 TAILQ_FIRST(&pool->requests);
469 TAILQ_REMOVE(&pool->requests, request, next);
470 evrpc_schedule_request(connection, request);
474 void
475 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
477 struct evhttp_connection *evcon;
478 TAILQ_FOREACH(evcon, &pool->connections, next) {
479 evcon->timeout = timeout_in_secs;
481 pool->timeout = timeout_in_secs;
485 static void evrpc_reply_done(struct evhttp_request *, void *);
486 static void evrpc_request_timeout(int, short, void *);
489 * Finds a connection object associated with the pool that is currently
490 * idle and can be used to make a request.
492 static struct evhttp_connection *
493 evrpc_pool_find_connection(struct evrpc_pool *pool)
495 struct evhttp_connection *connection;
496 TAILQ_FOREACH(connection, &pool->connections, next) {
497 if (TAILQ_FIRST(&connection->requests) == NULL)
498 return (connection);
501 return (NULL);
505 * We assume that the ctx is no longer queued on the pool.
507 static int
508 evrpc_schedule_request(struct evhttp_connection *connection,
509 struct evrpc_request_wrapper *ctx)
511 struct evhttp_request *req = NULL;
512 struct evrpc_pool *pool = ctx->pool;
513 struct evrpc_status status;
514 char *uri = NULL;
515 int res = 0;
517 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
518 goto error;
520 /* serialize the request data into the output buffer */
521 ctx->request_marshal(req->output_buffer, ctx->request);
523 uri = evrpc_construct_uri(ctx->name);
524 if (uri == NULL)
525 goto error;
527 /* we need to know the connection that we might have to abort */
528 ctx->evcon = connection;
530 /* apply hooks to the outgoing request */
531 if (evrpc_process_hooks(&pool->output_hooks,
532 req, req->output_buffer) == -1)
533 goto error;
535 if (pool->timeout > 0) {
537 * a timeout after which the whole rpc is going to be aborted.
539 struct timeval tv;
540 evutil_timerclear(&tv);
541 tv.tv_sec = pool->timeout;
542 evtimer_add(&ctx->ev_timeout, &tv);
545 /* start the request over the connection */
546 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
547 free(uri);
549 if (res == -1)
550 goto error;
552 return (0);
554 error:
555 memset(&status, 0, sizeof(status));
556 status.error = EVRPC_STATUS_ERR_UNSTARTED;
557 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
558 evrpc_request_wrapper_free(ctx);
559 return (-1);
563 evrpc_make_request(struct evrpc_request_wrapper *ctx)
565 struct evrpc_pool *pool = ctx->pool;
567 /* initialize the event structure for this rpc */
568 evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
569 if (pool->base != NULL)
570 event_base_set(pool->base, &ctx->ev_timeout);
572 /* we better have some available connections on the pool */
573 assert(TAILQ_FIRST(&pool->connections) != NULL);
576 * if no connection is available, we queue the request on the pool,
577 * the next time a connection is empty, the rpc will be send on that.
579 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
581 evrpc_pool_schedule(pool);
583 return (0);
586 static void
587 evrpc_reply_done(struct evhttp_request *req, void *arg)
589 struct evrpc_request_wrapper *ctx = arg;
590 struct evrpc_pool *pool = ctx->pool;
591 struct evrpc_status status;
592 int res = -1;
594 /* cancel any timeout we might have scheduled */
595 event_del(&ctx->ev_timeout);
597 memset(&status, 0, sizeof(status));
598 status.http_req = req;
600 /* we need to get the reply now */
601 if (req != NULL) {
602 /* apply hooks to the incoming request */
603 if (evrpc_process_hooks(&pool->input_hooks,
604 req, req->input_buffer) == -1) {
605 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
606 res = -1;
607 } else {
608 res = ctx->reply_unmarshal(ctx->reply,
609 req->input_buffer);
610 if (res == -1) {
611 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
614 } else {
615 status.error = EVRPC_STATUS_ERR_TIMEOUT;
618 if (res == -1) {
619 /* clear everything that we might have written previously */
620 ctx->reply_clear(ctx->reply);
623 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
625 evrpc_request_wrapper_free(ctx);
627 /* the http layer owns the request structure */
629 /* see if we can schedule another request */
630 evrpc_pool_schedule(pool);
633 static void
634 evrpc_pool_schedule(struct evrpc_pool *pool)
636 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
637 struct evhttp_connection *evcon;
639 /* if no requests are pending, we have no work */
640 if (ctx == NULL)
641 return;
643 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
644 TAILQ_REMOVE(&pool->requests, ctx, next);
645 evrpc_schedule_request(evcon, ctx);
649 static void
650 evrpc_request_timeout(int fd, short what, void *arg)
652 struct evrpc_request_wrapper *ctx = arg;
653 struct evhttp_connection *evcon = ctx->evcon;
654 assert(evcon != NULL);
656 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);