1 /* Copyright (c) 2001, Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
9 * @brief Construct a dispatch_t in safer, more OO way.
12 #define PUBSUB_PRIVATE
14 #include "lib/dispatch/dispatch.h"
15 #include "lib/dispatch/dispatch_cfg.h"
16 #include "lib/dispatch/dispatch_naming.h"
17 #include "lib/dispatch/msgtypes.h"
18 #include "lib/pubsub/pubsub_flags.h"
19 #include "lib/pubsub/pub_binding_st.h"
20 #include "lib/pubsub/pubsub_build.h"
21 #include "lib/pubsub/pubsub_builder_st.h"
22 #include "lib/pubsub/pubsub_connect.h"
24 #include "lib/container/smartlist.h"
25 #include "lib/log/util_bug.h"
26 #include "lib/malloc/malloc.h"
30 /** Construct and return a new empty pubsub_items_t. */
31 static pubsub_items_t
*
32 pubsub_items_new(void)
34 pubsub_items_t
*cfg
= tor_malloc_zero(sizeof(*cfg
));
35 cfg
->items
= smartlist_new();
36 cfg
->type_items
= smartlist_new();
40 /** Release all storage held in a pubsub_items_t. */
42 pubsub_items_free_(pubsub_items_t
*cfg
)
46 SMARTLIST_FOREACH(cfg
->items
, pubsub_cfg_t
*, item
, tor_free(item
));
47 SMARTLIST_FOREACH(cfg
->type_items
,
48 pubsub_type_cfg_t
*, item
, tor_free(item
));
49 smartlist_free(cfg
->items
);
50 smartlist_free(cfg
->type_items
);
54 /** Construct and return a new pubsub_builder_t. */
56 pubsub_builder_new(void)
58 dispatch_naming_init();
60 pubsub_builder_t
*pb
= tor_malloc_zero(sizeof(*pb
));
62 pb
->items
= pubsub_items_new();
67 * Release all storage held by a pubsub_builder_t.
69 * You'll (mostly) only want to call this function on an error case: if you're
70 * constructing a dispatch_t instead, you should call
71 * pubsub_builder_finalize() to consume the pubsub_builder_t.
74 pubsub_builder_free_(pubsub_builder_t
*pb
)
78 pubsub_items_free(pb
->items
);
84 * Create and return a pubsub_connector_t for the subsystem with ID
85 * <b>subsys</b> to use in adding publications, subscriptions, and types to
89 pubsub_connector_for_subsystem(pubsub_builder_t
*builder
,
93 ++builder
->n_connectors
;
95 pubsub_connector_t
*con
= tor_malloc_zero(sizeof(*con
));
97 con
->builder
= builder
;
98 con
->subsys_id
= subsys
;
104 * Release all storage held by a pubsub_connector_t.
107 pubsub_connector_free_(pubsub_connector_t
*con
)
113 --con
->builder
->n_connectors
;
114 tor_assert(con
->builder
->n_connectors
>= 0);
120 * Use <b>con</b> to add a request for being able to publish messages of type
121 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
124 pubsub_add_pub_(pubsub_connector_t
*con
,
126 channel_id_t channel
,
133 pubsub_cfg_t
*cfg
= tor_malloc_zero(sizeof(*cfg
));
135 memset(out
, 0, sizeof(*out
));
136 cfg
->is_publish
= true;
138 out
->msg_template
.sender
= cfg
->subsys
= con
->subsys_id
;
139 out
->msg_template
.channel
= cfg
->channel
= channel
;
140 out
->msg_template
.msg
= cfg
->msg
= msg
;
141 out
->msg_template
.type
= cfg
->type
= type
;
144 cfg
->added_by_file
= file
;
145 cfg
->added_by_line
= line
;
147 /* We're grabbing a pointer to the pub_binding_t so we can tell it about
148 * the dispatcher later on.
150 cfg
->pub_binding
= out
;
152 smartlist_add(con
->builder
->items
->items
, cfg
);
154 if (dcfg_msg_set_type(con
->builder
->cfg
, msg
, type
) < 0)
156 if (dcfg_msg_set_chan(con
->builder
->cfg
, msg
, channel
) < 0)
161 ++con
->builder
->n_errors
;
166 * Use <b>con</b> to add a request for being able to publish messages of type
167 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
168 * passing them to the callback in <b>recv_fn</b>.
171 pubsub_add_sub_(pubsub_connector_t
*con
,
173 channel_id_t channel
,
180 pubsub_cfg_t
*cfg
= tor_malloc_zero(sizeof(*cfg
));
182 cfg
->is_publish
= false;
183 cfg
->subsys
= con
->subsys_id
;
184 cfg
->channel
= channel
;
188 cfg
->added_by_file
= file
;
189 cfg
->added_by_line
= line
;
191 cfg
->recv_fn
= recv_fn
;
193 smartlist_add(con
->builder
->items
->items
, cfg
);
195 if (dcfg_msg_set_type(con
->builder
->cfg
, msg
, type
) < 0)
197 if (dcfg_msg_set_chan(con
->builder
->cfg
, msg
, channel
) < 0)
199 if (! (flags
& DISP_FLAG_STUB
)) {
200 if (dcfg_add_recv(con
->builder
->cfg
, msg
, cfg
->subsys
, recv_fn
) < 0)
206 ++con
->builder
->n_errors
;
211 * Use <b>con</b> to define the functions to use for manipulating the type
212 * <b>type</b>. Any function pointers left as NULL will be implemented as
216 pubsub_connector_register_type_(pubsub_connector_t
*con
,
218 dispatch_typefns_t
*fns
,
222 pubsub_type_cfg_t
*cfg
= tor_malloc_zero(sizeof(*cfg
));
224 memcpy(&cfg
->fns
, fns
, sizeof(*fns
));
225 cfg
->subsys
= con
->subsys_id
;
226 cfg
->added_by_file
= file
;
227 cfg
->added_by_line
= line
;
229 smartlist_add(con
->builder
->items
->type_items
, cfg
);
231 if (dcfg_type_set_fns(con
->builder
->cfg
, type
, fns
) < 0)
236 ++con
->builder
->n_errors
;
241 * Initialize the dispatch_ptr field in every relevant publish binding
245 pubsub_items_install_bindings(pubsub_items_t
*items
,
248 SMARTLIST_FOREACH_BEGIN(items
->items
, pubsub_cfg_t
*, cfg
) {
249 if (cfg
->pub_binding
) {
250 // XXXX we could skip this for STUB publishers, and for any publishers
251 // XXXX where all subscribers are STUB.
252 cfg
->pub_binding
->dispatch_ptr
= d
;
254 } SMARTLIST_FOREACH_END(cfg
);
258 * Remove the dispatch_ptr fields for all the relevant publish bindings
259 * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
260 * sending messages to a dispatcher that has been freed.
263 pubsub_items_clear_bindings(pubsub_items_t
*items
)
265 SMARTLIST_FOREACH_BEGIN(items
->items
, pubsub_cfg_t
*, cfg
) {
266 if (cfg
->pub_binding
) {
267 cfg
->pub_binding
->dispatch_ptr
= NULL
;
269 } SMARTLIST_FOREACH_END(cfg
);
273 * Create a new dispatcher as configured in a pubsub_builder_t.
275 * Consumes and frees its input.
278 pubsub_builder_finalize(pubsub_builder_t
*builder
,
279 pubsub_items_t
**items_out
)
281 dispatch_t
*dispatcher
= NULL
;
282 tor_assert_nonfatal(builder
->n_connectors
== 0);
284 if (pubsub_builder_check(builder
) < 0)
287 if (builder
->n_errors
) {
288 log_warn(LD_GENERAL
, "At least one error occurred previously when "
289 "configuring the dispatcher.");
293 dispatcher
= dispatch_new(builder
->cfg
);
298 pubsub_items_install_bindings(builder
->items
, dispatcher
);
300 *items_out
= builder
->items
;
301 builder
->items
= NULL
; /* Prevent free */
305 pubsub_builder_free(builder
);