3 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 #define WIN32_LEAN_AND_MEAN
36 #undef WIN32_LEAN_AND_MEAN
39 #include <sys/types.h>
41 #include <sys/socket.h>
43 #ifdef HAVE_SYS_TIME_H
46 #include <sys/queue.h>
59 #include "evrpc-internal.h"
65 evrpc_init(struct evhttp
*http_server
)
67 struct evrpc_base
* base
= calloc(1, sizeof(struct evrpc_base
));
71 /* we rely on the tagging sub system */
74 TAILQ_INIT(&base
->registered_rpcs
);
75 TAILQ_INIT(&base
->input_hooks
);
76 TAILQ_INIT(&base
->output_hooks
);
77 base
->http_server
= http_server
;
83 evrpc_free(struct evrpc_base
*base
)
86 struct evrpc_hook
*hook
;
88 while ((rpc
= TAILQ_FIRST(&base
->registered_rpcs
)) != NULL
) {
89 assert(evrpc_unregister_rpc(base
, rpc
->uri
));
91 while ((hook
= TAILQ_FIRST(&base
->input_hooks
)) != NULL
) {
92 assert(evrpc_remove_hook(base
, EVRPC_INPUT
, hook
));
94 while ((hook
= TAILQ_FIRST(&base
->output_hooks
)) != NULL
) {
95 assert(evrpc_remove_hook(base
, EVRPC_OUTPUT
, hook
));
101 evrpc_add_hook(void *vbase
,
102 enum EVRPC_HOOK_TYPE hook_type
,
103 int (*cb
)(struct evhttp_request
*, struct evbuffer
*, void *),
106 struct _evrpc_hooks
*base
= vbase
;
107 struct evrpc_hook_list
*head
= NULL
;
108 struct evrpc_hook
*hook
= NULL
;
111 head
= &base
->in_hooks
;
114 head
= &base
->out_hooks
;
117 assert(hook_type
== EVRPC_INPUT
|| hook_type
== EVRPC_OUTPUT
);
120 hook
= calloc(1, sizeof(struct evrpc_hook
));
121 assert(hook
!= NULL
);
124 hook
->process_arg
= cb_arg
;
125 TAILQ_INSERT_TAIL(head
, hook
, next
);
131 evrpc_remove_hook_internal(struct evrpc_hook_list
*head
, void *handle
)
133 struct evrpc_hook
*hook
= NULL
;
134 TAILQ_FOREACH(hook
, head
, next
) {
135 if (hook
== handle
) {
136 TAILQ_REMOVE(head
, hook
, next
);
146 * remove the hook specified by the handle
150 evrpc_remove_hook(void *vbase
, enum EVRPC_HOOK_TYPE hook_type
, void *handle
)
152 struct _evrpc_hooks
*base
= vbase
;
153 struct evrpc_hook_list
*head
= NULL
;
156 head
= &base
->in_hooks
;
159 head
= &base
->out_hooks
;
162 assert(hook_type
== EVRPC_INPUT
|| hook_type
== EVRPC_OUTPUT
);
165 return (evrpc_remove_hook_internal(head
, handle
));
169 evrpc_process_hooks(struct evrpc_hook_list
*head
,
170 struct evhttp_request
*req
, struct evbuffer
*evbuf
)
172 struct evrpc_hook
*hook
;
173 TAILQ_FOREACH(hook
, head
, next
) {
174 if (hook
->process(req
, evbuf
, hook
->process_arg
) == -1)
181 static void evrpc_pool_schedule(struct evrpc_pool
*pool
);
182 static void evrpc_request_cb(struct evhttp_request
*, void *);
183 void evrpc_request_done(struct evrpc_req_generic
*);
186 * Registers a new RPC with the HTTP server. The evrpc object is expected
187 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
188 * calls this function.
192 evrpc_construct_uri(const char *uri
)
194 char *constructed_uri
;
195 int constructed_uri_len
;
197 constructed_uri_len
= strlen(EVRPC_URI_PREFIX
) + strlen(uri
) + 1;
198 if ((constructed_uri
= malloc(constructed_uri_len
)) == NULL
)
199 event_err(1, "%s: failed to register rpc at %s",
201 memcpy(constructed_uri
, EVRPC_URI_PREFIX
, strlen(EVRPC_URI_PREFIX
));
202 memcpy(constructed_uri
+ strlen(EVRPC_URI_PREFIX
), uri
, strlen(uri
));
203 constructed_uri
[constructed_uri_len
- 1] = '\0';
205 return (constructed_uri
);
209 evrpc_register_rpc(struct evrpc_base
*base
, struct evrpc
*rpc
,
210 void (*cb
)(struct evrpc_req_generic
*, void *), void *cb_arg
)
212 char *constructed_uri
= evrpc_construct_uri(rpc
->uri
);
216 rpc
->cb_arg
= cb_arg
;
218 TAILQ_INSERT_TAIL(&base
->registered_rpcs
, rpc
, next
);
220 evhttp_set_cb(base
->http_server
,
225 free(constructed_uri
);
231 evrpc_unregister_rpc(struct evrpc_base
*base
, const char *name
)
233 char *registered_uri
= NULL
;
236 /* find the right rpc; linear search might be slow */
237 TAILQ_FOREACH(rpc
, &base
->registered_rpcs
, next
) {
238 if (strcmp(rpc
->uri
, name
) == 0)
242 /* We did not find an RPC with this name */
245 TAILQ_REMOVE(&base
->registered_rpcs
, rpc
, next
);
247 free((char *)rpc
->uri
);
250 registered_uri
= evrpc_construct_uri(name
);
252 /* remove the http server callback */
253 assert(evhttp_del_cb(base
->http_server
, registered_uri
) == 0);
255 free(registered_uri
);
260 evrpc_request_cb(struct evhttp_request
*req
, void *arg
)
262 struct evrpc
*rpc
= arg
;
263 struct evrpc_req_generic
*rpc_state
= NULL
;
265 /* let's verify the outside parameters */
266 if (req
->type
!= EVHTTP_REQ_POST
||
267 EVBUFFER_LENGTH(req
->input_buffer
) <= 0)
271 * we might want to allow hooks to suspend the processing,
272 * but at the moment, we assume that they just act as simple
275 if (evrpc_process_hooks(&rpc
->base
->input_hooks
,
276 req
, req
->input_buffer
) == -1)
279 rpc_state
= calloc(1, sizeof(struct evrpc_req_generic
));
280 if (rpc_state
== NULL
)
283 /* let's check that we can parse the request */
284 rpc_state
->request
= rpc
->request_new();
285 if (rpc_state
->request
== NULL
)
288 rpc_state
->rpc
= rpc
;
290 if (rpc
->request_unmarshal(
291 rpc_state
->request
, req
->input_buffer
) == -1) {
292 /* we failed to parse the request; that's a bummer */
296 /* at this point, we have a well formed request, prepare the reply */
298 rpc_state
->reply
= rpc
->reply_new();
299 if (rpc_state
->reply
== NULL
)
302 rpc_state
->http_req
= req
;
303 rpc_state
->done
= evrpc_request_done
;
305 /* give the rpc to the user; they can deal with it */
306 rpc
->cb(rpc_state
, rpc
->cb_arg
);
311 evrpc_reqstate_free(rpc_state
);
312 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, "Service Error");
317 evrpc_reqstate_free(struct evrpc_req_generic
* rpc_state
)
319 /* clean up all memory */
320 if (rpc_state
!= NULL
) {
321 struct evrpc
*rpc
= rpc_state
->rpc
;
323 if (rpc_state
->request
!= NULL
)
324 rpc
->request_free(rpc_state
->request
);
325 if (rpc_state
->reply
!= NULL
)
326 rpc
->reply_free(rpc_state
->reply
);
332 evrpc_request_done(struct evrpc_req_generic
* rpc_state
)
334 struct evhttp_request
*req
= rpc_state
->http_req
;
335 struct evrpc
*rpc
= rpc_state
->rpc
;
336 struct evbuffer
* data
= NULL
;
338 if (rpc
->reply_complete(rpc_state
->reply
) == -1) {
339 /* the reply was not completely filled in. error out */
343 if ((data
= evbuffer_new()) == NULL
) {
348 /* serialize the reply */
349 rpc
->reply_marshal(data
, rpc_state
->reply
);
351 /* do hook based tweaks to the request */
352 if (evrpc_process_hooks(&rpc
->base
->output_hooks
,
356 /* on success, we are going to transmit marshaled binary data */
357 if (evhttp_find_header(req
->output_headers
, "Content-Type") == NULL
) {
358 evhttp_add_header(req
->output_headers
,
359 "Content-Type", "application/octet-stream");
362 evhttp_send_reply(req
, HTTP_OK
, "OK", data
);
366 evrpc_reqstate_free(rpc_state
);
373 evrpc_reqstate_free(rpc_state
);
374 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, "Service Error");
378 /* Client implementation of RPC site */
380 static int evrpc_schedule_request(struct evhttp_connection
*connection
,
381 struct evrpc_request_wrapper
*ctx
);
384 evrpc_pool_new(struct event_base
*base
)
386 struct evrpc_pool
*pool
= calloc(1, sizeof(struct evrpc_pool
));
390 TAILQ_INIT(&pool
->connections
);
391 TAILQ_INIT(&pool
->requests
);
393 TAILQ_INIT(&pool
->input_hooks
);
394 TAILQ_INIT(&pool
->output_hooks
);
403 evrpc_request_wrapper_free(struct evrpc_request_wrapper
*request
)
410 evrpc_pool_free(struct evrpc_pool
*pool
)
412 struct evhttp_connection
*connection
;
413 struct evrpc_request_wrapper
*request
;
414 struct evrpc_hook
*hook
;
416 while ((request
= TAILQ_FIRST(&pool
->requests
)) != NULL
) {
417 TAILQ_REMOVE(&pool
->requests
, request
, next
);
418 /* if this gets more complicated we need our own function */
419 evrpc_request_wrapper_free(request
);
422 while ((connection
= TAILQ_FIRST(&pool
->connections
)) != NULL
) {
423 TAILQ_REMOVE(&pool
->connections
, connection
, next
);
424 evhttp_connection_free(connection
);
427 while ((hook
= TAILQ_FIRST(&pool
->input_hooks
)) != NULL
) {
428 assert(evrpc_remove_hook(pool
, EVRPC_INPUT
, hook
));
431 while ((hook
= TAILQ_FIRST(&pool
->output_hooks
)) != NULL
) {
432 assert(evrpc_remove_hook(pool
, EVRPC_OUTPUT
, hook
));
439 * Add a connection to the RPC pool. A request scheduled on the pool
440 * may use any available connection.
444 evrpc_pool_add_connection(struct evrpc_pool
*pool
,
445 struct evhttp_connection
*connection
) {
446 assert(connection
->http_server
== NULL
);
447 TAILQ_INSERT_TAIL(&pool
->connections
, connection
, next
);
450 * associate an event base with this connection
452 if (pool
->base
!= NULL
)
453 evhttp_connection_set_base(connection
, pool
->base
);
456 * unless a timeout was specifically set for a connection,
457 * the connection inherits the timeout from the pool.
459 if (connection
->timeout
== -1)
460 connection
->timeout
= pool
->timeout
;
463 * if we have any requests pending, schedule them with the new
467 if (TAILQ_FIRST(&pool
->requests
) != NULL
) {
468 struct evrpc_request_wrapper
*request
=
469 TAILQ_FIRST(&pool
->requests
);
470 TAILQ_REMOVE(&pool
->requests
, request
, next
);
471 evrpc_schedule_request(connection
, request
);
476 evrpc_pool_set_timeout(struct evrpc_pool
*pool
, int timeout_in_secs
)
478 struct evhttp_connection
*evcon
;
479 TAILQ_FOREACH(evcon
, &pool
->connections
, next
) {
480 evcon
->timeout
= timeout_in_secs
;
482 pool
->timeout
= timeout_in_secs
;
486 static void evrpc_reply_done(struct evhttp_request
*, void *);
487 static void evrpc_request_timeout(int, short, void *);
490 * Finds a connection object associated with the pool that is currently
491 * idle and can be used to make a request.
493 static struct evhttp_connection
*
494 evrpc_pool_find_connection(struct evrpc_pool
*pool
)
496 struct evhttp_connection
*connection
;
497 TAILQ_FOREACH(connection
, &pool
->connections
, next
) {
498 if (TAILQ_FIRST(&connection
->requests
) == NULL
)
506 * We assume that the ctx is no longer queued on the pool.
509 evrpc_schedule_request(struct evhttp_connection
*connection
,
510 struct evrpc_request_wrapper
*ctx
)
512 struct evhttp_request
*req
= NULL
;
513 struct evrpc_pool
*pool
= ctx
->pool
;
514 struct evrpc_status status
;
518 if ((req
= evhttp_request_new(evrpc_reply_done
, ctx
)) == NULL
)
521 /* serialize the request data into the output buffer */
522 ctx
->request_marshal(req
->output_buffer
, ctx
->request
);
524 uri
= evrpc_construct_uri(ctx
->name
);
528 /* we need to know the connection that we might have to abort */
529 ctx
->evcon
= connection
;
531 /* apply hooks to the outgoing request */
532 if (evrpc_process_hooks(&pool
->output_hooks
,
533 req
, req
->output_buffer
) == -1)
536 if (pool
->timeout
> 0) {
538 * a timeout after which the whole rpc is going to be aborted.
541 evutil_timerclear(&tv
);
542 tv
.tv_sec
= pool
->timeout
;
543 evtimer_add(&ctx
->ev_timeout
, &tv
);
546 /* start the request over the connection */
547 res
= evhttp_make_request(connection
, req
, EVHTTP_REQ_POST
, uri
);
556 memset(&status
, 0, sizeof(status
));
557 status
.error
= EVRPC_STATUS_ERR_UNSTARTED
;
558 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
559 evrpc_request_wrapper_free(ctx
);
564 evrpc_make_request(struct evrpc_request_wrapper
*ctx
)
566 struct evrpc_pool
*pool
= ctx
->pool
;
568 /* initialize the event structure for this rpc */
569 evtimer_set(&ctx
->ev_timeout
, evrpc_request_timeout
, ctx
);
570 if (pool
->base
!= NULL
)
571 event_base_set(pool
->base
, &ctx
->ev_timeout
);
573 /* we better have some available connections on the pool */
574 assert(TAILQ_FIRST(&pool
->connections
) != NULL
);
577 * if no connection is available, we queue the request on the pool,
578 * the next time a connection is empty, the rpc will be send on that.
580 TAILQ_INSERT_TAIL(&pool
->requests
, ctx
, next
);
582 evrpc_pool_schedule(pool
);
588 evrpc_reply_done(struct evhttp_request
*req
, void *arg
)
590 struct evrpc_request_wrapper
*ctx
= arg
;
591 struct evrpc_pool
*pool
= ctx
->pool
;
592 struct evrpc_status status
;
595 /* cancel any timeout we might have scheduled */
596 event_del(&ctx
->ev_timeout
);
598 memset(&status
, 0, sizeof(status
));
599 status
.http_req
= req
;
601 /* we need to get the reply now */
603 /* apply hooks to the incoming request */
604 if (evrpc_process_hooks(&pool
->input_hooks
,
605 req
, req
->input_buffer
) == -1) {
606 status
.error
= EVRPC_STATUS_ERR_HOOKABORTED
;
609 res
= ctx
->reply_unmarshal(ctx
->reply
,
612 status
.error
= EVRPC_STATUS_ERR_BADPAYLOAD
;
616 status
.error
= EVRPC_STATUS_ERR_TIMEOUT
;
620 /* clear everything that we might have written previously */
621 ctx
->reply_clear(ctx
->reply
);
624 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
626 evrpc_request_wrapper_free(ctx
);
628 /* the http layer owns the request structure */
630 /* see if we can schedule another request */
631 evrpc_pool_schedule(pool
);
635 evrpc_pool_schedule(struct evrpc_pool
*pool
)
637 struct evrpc_request_wrapper
*ctx
= TAILQ_FIRST(&pool
->requests
);
638 struct evhttp_connection
*evcon
;
640 /* if no requests are pending, we have no work */
644 if ((evcon
= evrpc_pool_find_connection(pool
)) != NULL
) {
645 TAILQ_REMOVE(&pool
->requests
, ctx
, next
);
646 evrpc_schedule_request(evcon
, ctx
);
651 evrpc_request_timeout(int fd
, short what
, void *arg
)
653 struct evrpc_request_wrapper
*ctx
= arg
;
654 struct evhttp_connection
*evcon
= ctx
->evcon
;
655 assert(evcon
!= NULL
);
657 evhttp_connection_fail(evcon
, EVCON_HTTP_TIMEOUT
);