Merge pull request #14 from lwes/pluggable-emission
[lwes-erlang/github-mirror.git] / src / lwes_multi_emitter.erl
blob647506b41e31a45ffb2b5d285f3e2b585964fadb
2 % This module implements an M of N emitter. Given N different channels,
3 % when emit is called, the event is emitted over M of them. You can use
4 % this in situations where you want to mimic multicast by emitting the
5 % same event to multiple places, in addition to load balancing the emission
6 % across a set of machines
8 -module (lwes_multi_emitter).
10 -include ("lwes_internal.hrl").
12 %% API
13 -export ([ new/1,
14 select/1,
15 emit/2,
16 close/1
17 ]).
19 %%====================================================================
20 %% API
21 %%====================================================================
22 new (Config) ->
23 new0 (normalize_emitters_config(Config)).
25 new0 ({NumToSelect, Type, ListOfSubConfigs})
26 when is_integer(NumToSelect),
27 (is_atom(Type) andalso (Type =:= group orelse Type =:= random)),
28 is_list(ListOfSubConfigs) ->
30 Max = length(ListOfSubConfigs),
31 case NumToSelect of
32 _ when NumToSelect >= 1; NumToSelect > Max ->
33 { ok,
34 #lwes_multi_emitter {
35 type = Type,
36 max = Max,
37 num = NumToSelect,
38 configs = [ begin
39 Cout =
40 case new0(Config) of
41 {ok, C} -> C;
42 C -> C
43 end,
44 Cout
45 end
46 || Config <- ListOfSubConfigs ]
49 _ ->
50 { error, bad_m_value }
51 end;
52 new0 ({Module,Config}) when is_atom(Module), is_list(Config) ->
53 Emitter = Module:new (Config),
54 lwes_stats:initialize(Module:id(Emitter)),
55 {Module, Emitter};
56 new0 (Config) ->
57 Emitter = lwes_emitter_udp:new(Config),
58 lwes_stats:initialize(lwes_emitter_udp:id(Emitter)),
59 {lwes_emitter_udp, Emitter}.
61 % Normalize all emitter configuration to the form used by lwes_multi_emitter.
63 % This clause is for config of pluggable emission for a single emitter
64 % {module_implementing_emitter_behaviour, Config}
65 normalize_emitters_config ({M, Config}) when is_atom(M) ->
66 {1, group, [{M, Config}]};
67 % This clause is for config which has a number and a list of SubConfigs
68 % {NumberToEmitTo, ListOfSubConfigs}
69 normalize_emitters_config ({N, L}) when is_integer(N), is_list(L) ->
70 {N, group, L};
71 % This clause is for the simple case of a single Ip/Port so
72 % {"127.0.0.1",9191}
73 normalize_emitters_config (C = {Ip, Port}) when is_list(Ip), ?is_uint16(Port) ->
74 {1, group, [C]};
75 % This clause is for the simple case of a single Ip/Port so
76 % {"127.0.0.1",9191, [other,config]}
77 normalize_emitters_config (C = {Ip, Port,Config})
78 when is_list(Ip), ?is_uint16(Port), is_list(Config) ->
79 {1, group, [C]};
80 % This is for the case where the config is actually already in the form
81 % the lwes_mulit_emitter recognizes
82 normalize_emitters_config ({NumToSelect, Type, ListOfSubConfigs})
83 when is_integer(NumToSelect),
84 (is_atom(Type) andalso (Type =:= group orelse Type =:= random)),
85 is_list(ListOfSubConfigs) ->
86 case ListOfSubConfigs of
87 L = [{_,_}] ->
88 {NumToSelect, Type, L};
89 L = [{_,Port,Config}] when ?is_uint16(Port), is_list(Config) ->
90 {NumToSelect, Type, L};
91 L when is_list(L) ->
92 {NumToSelect, Type,[ sub_normalize(S) || S <- ListOfSubConfigs ]}
93 end.
95 sub_normalize (C = {Ip, Port}) when is_list(Ip), ?is_uint16(Port) ->
97 sub_normalize (C = {Ip, Port, Config})
98 when is_list(Ip), ?is_uint16(Port), is_list(Config) ->
100 sub_normalize (C = {M, Config}) when is_atom(M), is_list(Config) ->
102 sub_normalize (C) ->
105 % returns a sorted list of places to emit to in the form
106 % [ {EmitterModule, [State0, ...]}, ... ]
107 select (C) ->
108 collate(lists:sort(lists:flatten(select0(C)))).
110 select0 (#lwes_multi_emitter {type = _, max = N,
111 num = N, configs = Configs}) ->
112 select0 (Configs);
113 select0 (#lwes_multi_emitter {type = _, max = M,
114 num = N, configs = Configs}) ->
115 Start = rand:uniform(M),
116 Config = list_to_tuple (Configs),
117 Indices = wrapped_range (Start, N, M),
118 case Indices of
119 [I] -> select0(element(I, Config));
120 _ -> [ select0(element(I, Config)) || I <- Indices ]
121 end;
122 select0 (A = {_,P}) when is_integer(P) ->
124 select0 (L) when is_list(L) ->
125 [ select0(E) || E <- L ];
126 select0 (A) ->
129 wrapped_range (Start, Number, Max) when Start > Max ->
130 wrapped_range (case Start rem Max of 0 -> Max; V -> V end, Number, Max);
131 wrapped_range (Start, Number, Max) ->
132 wrapped_range (Start, Number, Max, []).
134 % determine a range of integers which wrap
135 wrapped_range (_, 0, _, Accumulated) ->
136 lists:reverse (Accumulated);
137 wrapped_range (Max, Number, Max, Accumulated) ->
138 wrapped_range (1, Number - 1, Max, [Max | Accumulated]);
139 wrapped_range (Current, Number, Max, Accumulated) ->
140 wrapped_range (Current + 1, Number - 1, Max, [Current | Accumulated]).
142 collate (L) ->
143 collate0(L,[]).
145 collate0 ([], Accum) ->
146 Accum;
147 collate0 ([{M,State}|RestIn],[]) ->
148 collate0 (RestIn, [{M,[State]}]);
149 collate0 ([{M,State} | RestIn], [{M,StateList} | RestOut]) ->
150 collate0 (RestIn, [{M,[State | StateList]} | RestOut]);
151 collate0 ([{M,State} | RestIn], AccumIn = [{N,_} | _]) when M =/= N ->
152 collate0 (RestIn, [{M,[State]} | AccumIn]).
154 emit (AllEmitters, Event) ->
155 SelectedEmitters = select(AllEmitters),
156 % select/1 will return a list of 2-tuples of the module to use and the
157 % emitter configs for that module
158 lists:foreach (
159 fun ({Module, Emitters}) ->
160 % one callback to prep the event for the Emitters
161 Packet = Module:prep (Event),
163 % then loop over emitters
164 lists:foreach (fun (Emitter) ->
165 Id = Module:id (Emitter),
166 case Module:emit (Emitter, Packet) of
167 ok -> lwes_stats:increment_sent (Id);
168 {error, _} -> lwes_stats:increment_errors (Id)
170 end,
171 Emitters)
172 end,
173 SelectedEmitters),
176 close (#lwes_multi_emitter { configs = Configs }) ->
177 [ close(C) || C <- Configs ];
178 close ({Module,Config}) ->
179 lwes_stats:delete (Module:id(Config)),
180 Module:close(Config).
182 %%====================================================================
183 %% Internal functions
184 %%====================================================================
186 %%====================================================================
187 %% Test functions
188 %%====================================================================
189 -ifdef (TEST).
190 -include_lib ("eunit/include/eunit.hrl").
192 config (basic) ->
193 { "127.0.0.1", 9191 };
194 config (random) ->
195 { 2, random,
196 [ {"127.0.0.1",30000}, {"127.0.0.1",30001}, {"127.0.0.1",30002} ] };
197 config (group) ->
198 { 3, group,
199 [ {1, random, [ { "127.0.0.1",5390 }, { "127.0.0.1",5391 } ] },
200 {1, random, [ { "127.0.0.1",5392 }, { "127.0.0.1",5393 } ] },
201 {1, random, [ { "127.0.0.1",5394 }, { "127.0.0.1",5395 } ] }
205 possible_answers (basic) ->
207 [{{127,0,0,1},9191}]
209 possible_answers (random) ->
211 [{{127,0,0,1},30000}, {{127,0,0,1},30001}],
212 [{{127,0,0,1},30001}, {{127,0,0,1},30002}],
213 [{{127,0,0,1},30000}, {{127,0,0,1},30002}]
215 possible_answers (group) ->
217 [{{127,0,0,1},5390}, {{127,0,0,1},5392}, {{127,0,0,1},5394}],
218 [{{127,0,0,1},5390}, {{127,0,0,1},5392}, {{127,0,0,1},5395}],
219 [{{127,0,0,1},5390}, {{127,0,0,1},5393}, {{127,0,0,1},5394}],
220 [{{127,0,0,1},5390}, {{127,0,0,1},5393}, {{127,0,0,1},5395}],
221 [{{127,0,0,1},5391}, {{127,0,0,1},5392}, {{127,0,0,1},5394}],
222 [{{127,0,0,1},5391}, {{127,0,0,1},5392}, {{127,0,0,1},5395}],
223 [{{127,0,0,1},5391}, {{127,0,0,1},5393}, {{127,0,0,1},5394}],
224 [{{127,0,0,1},5391}, {{127,0,0,1},5393}, {{127,0,0,1},5395}]
227 test_one(T) ->
228 Config = config(T),
229 {ok, C} = new(Config),
230 Results =
231 lists:foldl (
232 fun (_, A) ->
233 % jumping through a few hoops here as I changed the way select
234 % worked to not return the results of getting the identifier as
235 % it worked before. So in this case we unbox the selection
236 [{lwes_emitter_udp, Selected}] = select(C),
237 % then sort the list of Ip/Address pairs
238 Ids = lists:sort([ lwes_emitter_udp:id (S) || S <- Selected ]),
239 Answers = possible_answers(T),
240 % finally we check to see if we have an answer
241 lists:member (Ids, Answers) and A
242 end,
243 true,
244 lists:seq(1,100)),
245 close(C),
246 Results.
248 check_selection_test_ () ->
249 { setup,
250 fun() ->
251 case lwes_stats:start_link() of
252 {error,{already_started,_}} -> exists;
253 {ok, Pid} -> Pid
255 end,
256 fun (exists) -> ok;
257 (Pid) -> unlink(Pid), gen_server:stop(Pid)
258 end,
260 ?_assert(test_one(T))
261 || T <- [basic, random, group]
265 normalize_test_ () ->
267 ?_assertEqual (Expected, normalize_emitters_config(Given))
268 || {Given, Expected}
269 <- [
270 { {"127.0.0.1",9191},
271 {1,group,[{"127.0.0.1",9191}]} },
272 { {"127.0.0.1",9191,[{ttl,25}]},
273 {1,group,[{"127.0.0.1",9191,[{ttl,25}]}]} },
274 { {2, [{"127.0.0.1",9191},{"127.0.0.1",9292}]},
275 {2,group,[{"127.0.0.1",9191},{"127.0.0.1",9292}]} },
276 { {1, [{"127.0.0.1",9191},{"127.0.0.1",9292}]},
277 {1,group,[{"127.0.0.1",9191},{"127.0.0.1",9292}]} },
278 { {2, random, [{"127.0.0.1",30000},{"127.0.0.1",30001},{"127.0.0.1",30002}]},
279 {2, random, [{"127.0.0.1",30000}, {"127.0.0.1",30001}, {"127.0.0.1",30002}]} },
280 { {3, group, [{1,random,[{"127.0.0.1",5301},{"127.0.0.1",5302}]},{1,random,[{"127.0.0.1",5311},{"127.0.0.1",5312}]},{1,random,[{"127.0.0.1",5321},{"127.0.0.1",5322}]}]},
281 {3, group, [{1,random,[{"127.0.0.1",5301},{"127.0.0.1",5302}]},{1,random,[{"127.0.0.1",5311},{"127.0.0.1",5312}]},{1,random,[{"127.0.0.1",5321},{"127.0.0.1",5322}]}]} },
282 { {2, group, [{2, random, [{"127.0.0.1",30000},{"127.0.0.1",30001},{"127.0.0.1",30002}]},{lwes_emitter_stdout,[{label,stdout}]}]},
283 {2, group, [{2, random, [{"127.0.0.1",30000},{"127.0.0.1",30001},{"127.0.0.1",30002}]},{lwes_emitter_stdout,[{label,stdout}]}]}
285 % TODO: support labels and sub-labels
286 % {"127.0.0.1",9191} ->
287 % {"127.0.0.1",9191,[{label, abc}]} ->
288 % {1, group, [{"127.0.0.1",9191}], abc}
290 % {2,group,[{"127.0.0.1",9191},{"127.0.0.1",9292}]} ->
291 % {2, group, [{"127.0.0.1",9191},{"127.0.0.1",9292}], abc}
293 % {2, group, [{2, random, [{"127.0.0.1",30000},
294 % {"127.0.0.1",30001},
295 % {"127.0.0.1",30002}]},
296 % {lwes_emitter_stdout,[{label,stdout}]}
298 % } ->
299 % {2, group, [{2, random, [{"127.0.0.1",30000},
300 % {"127.0.0.1",30001},
301 % {"127.0.0.1",30002}], [{label,udp}]},
302 % {lwes_emitter_stdout,[{label,stdout}]}
303 % ], [{label, grid}]
309 -endif.