2 % This module implements an M of N emitter. Given N different channels,
3 % when emit is called, the event is emitted over M of them. You can use
4 % this in situations where you want to mimic multicast by emitting the
5 % same event to multiple places, in addition to load balancing the emission
6 % across a set of machines
8 -module (lwes_multi_emitter
).
10 -include ("lwes_internal.hrl").
19 %%====================================================================
21 %%====================================================================
23 new0 (normalize_emitters_config(Config
)).
25 new0 ({NumToSelect
, Type
, ListOfSubConfigs
})
26 when is_integer(NumToSelect
),
27 (is_atom(Type
) andalso (Type
=:= group orelse Type
=:= random
)),
28 is_list(ListOfSubConfigs
) ->
30 Max
= length(ListOfSubConfigs
),
32 _
when NumToSelect
>= 1; NumToSelect
> Max
->
46 || Config
<- ListOfSubConfigs
]
50 { error
, bad_m_value
}
52 new0 ({Module
,Config
}) when is_atom(Module
), is_list(Config
) ->
53 Emitter
= Module:new (Config
),
54 lwes_stats:initialize(Module:id(Emitter
)),
57 Emitter
= lwes_emitter_udp:new(Config
),
58 lwes_stats:initialize(lwes_emitter_udp:id(Emitter
)),
59 {lwes_emitter_udp
, Emitter
}.
61 % Normalize all emitter configuration to the form used by lwes_multi_emitter.
63 % This clause is for config of pluggable emission for a single emitter
64 % {module_implementing_emitter_behaviour, Config}
65 normalize_emitters_config ({M
, Config
}) when is_atom(M
) ->
66 {1, group
, [{M
, Config
}]};
67 % This clause is for config which has a number and a list of SubConfigs
68 % {NumberToEmitTo, ListOfSubConfigs}
69 normalize_emitters_config ({N
, L
}) when is_integer(N
), is_list(L
) ->
71 % This clause is for the simple case of a single Ip/Port so
73 normalize_emitters_config (C
= {Ip
, Port
}) when is_list(Ip
), ?
is_uint16(Port
) ->
75 % This clause is for the simple case of a single Ip/Port so
76 % {"127.0.0.1",9191, [other,config]}
77 normalize_emitters_config (C
= {Ip
, Port
,Config
})
78 when is_list(Ip
), ?
is_uint16(Port
), is_list(Config
) ->
80 % This is for the case where the config is actually already in the form
81 % the lwes_mulit_emitter recognizes
82 normalize_emitters_config ({NumToSelect
, Type
, ListOfSubConfigs
})
83 when is_integer(NumToSelect
),
84 (is_atom(Type
) andalso (Type
=:= group orelse Type
=:= random
)),
85 is_list(ListOfSubConfigs
) ->
86 case ListOfSubConfigs
of
88 {NumToSelect
, Type
, L
};
89 L
= [{_
,Port
,Config
}] when ?
is_uint16(Port
), is_list(Config
) ->
90 {NumToSelect
, Type
, L
};
92 {NumToSelect
, Type
,[ sub_normalize(S
) || S
<- ListOfSubConfigs
]}
95 sub_normalize (C
= {Ip
, Port
}) when is_list(Ip
), ?
is_uint16(Port
) ->
97 sub_normalize (C
= {Ip
, Port
, Config
})
98 when is_list(Ip
), ?
is_uint16(Port
), is_list(Config
) ->
100 sub_normalize (C
= {M
, Config
}) when is_atom(M
), is_list(Config
) ->
105 % returns a sorted list of places to emit to in the form
106 % [ {EmitterModule, [State0, ...]}, ... ]
108 collate(lists:sort(lists:flatten(select0(C
)))).
110 select0 (#lwes_multi_emitter
{type
= _
, max
= N
,
111 num
= N
, configs
= Configs
}) ->
113 select0 (#lwes_multi_emitter
{type
= _
, max
= M
,
114 num
= N
, configs
= Configs
}) ->
115 Start
= rand:uniform(M
),
116 Config
= list_to_tuple (Configs
),
117 Indices
= wrapped_range (Start
, N
, M
),
119 [I
] -> select0(element(I
, Config
));
120 _
-> [ select0(element(I
, Config
)) || I
<- Indices
]
122 select0 (A
= {_
,P
}) when is_integer(P
) ->
124 select0 (L
) when is_list(L
) ->
125 [ select0(E
) || E
<- L
];
129 wrapped_range (Start
, Number
, Max
) when Start
> Max
->
130 wrapped_range (case Start rem Max
of 0 -> Max
; V
-> V
end, Number
, Max
);
131 wrapped_range (Start
, Number
, Max
) ->
132 wrapped_range (Start
, Number
, Max
, []).
134 % determine a range of integers which wrap
135 wrapped_range (_
, 0, _
, Accumulated
) ->
136 lists:reverse (Accumulated
);
137 wrapped_range (Max
, Number
, Max
, Accumulated
) ->
138 wrapped_range (1, Number
- 1, Max
, [Max
| Accumulated
]);
139 wrapped_range (Current
, Number
, Max
, Accumulated
) ->
140 wrapped_range (Current
+ 1, Number
- 1, Max
, [Current
| Accumulated
]).
145 collate0 ([], Accum
) ->
147 collate0 ([{M
,State
}|RestIn
],[]) ->
148 collate0 (RestIn
, [{M
,[State
]}]);
149 collate0 ([{M
,State
} | RestIn
], [{M
,StateList
} | RestOut
]) ->
150 collate0 (RestIn
, [{M
,[State
| StateList
]} | RestOut
]);
151 collate0 ([{M
,State
} | RestIn
], AccumIn
= [{N
,_
} | _
]) when M
=/= N
->
152 collate0 (RestIn
, [{M
,[State
]} | AccumIn
]).
154 emit (AllEmitters
, Event
) ->
155 SelectedEmitters
= select(AllEmitters
),
156 % select/1 will return a list of 2-tuples of the module to use and the
157 % emitter configs for that module
159 fun ({Module
, Emitters
}) ->
160 % one callback to prep the event for the Emitters
161 Packet
= Module:prep (Event
),
163 % then loop over emitters
164 lists:foreach (fun (Emitter
) ->
165 Id
= Module:id (Emitter
),
166 case Module:emit (Emitter
, Packet
) of
167 ok
-> lwes_stats:increment_sent (Id
);
168 {error
, _
} -> lwes_stats:increment_errors (Id
)
176 close (#lwes_multi_emitter
{ configs
= Configs
}) ->
177 [ close(C
) || C
<- Configs
];
178 close ({Module
,Config
}) ->
179 lwes_stats:delete (Module:id(Config
)),
180 Module:close(Config
).
182 %%====================================================================
183 %% Internal functions
184 %%====================================================================
186 %%====================================================================
188 %%====================================================================
190 -include_lib ("eunit/include/eunit.hrl").
193 { "127.0.0.1", 9191 };
196 [ {"127.0.0.1",30000}, {"127.0.0.1",30001}, {"127.0.0.1",30002} ] };
199 [ {1, random
, [ { "127.0.0.1",5390 }, { "127.0.0.1",5391 } ] },
200 {1, random
, [ { "127.0.0.1",5392 }, { "127.0.0.1",5393 } ] },
201 {1, random
, [ { "127.0.0.1",5394 }, { "127.0.0.1",5395 } ] }
205 possible_answers (basic
) ->
209 possible_answers (random
) ->
211 [{{127,0,0,1},30000}, {{127,0,0,1},30001}],
212 [{{127,0,0,1},30001}, {{127,0,0,1},30002}],
213 [{{127,0,0,1},30000}, {{127,0,0,1},30002}]
215 possible_answers (group
) ->
217 [{{127,0,0,1},5390}, {{127,0,0,1},5392}, {{127,0,0,1},5394}],
218 [{{127,0,0,1},5390}, {{127,0,0,1},5392}, {{127,0,0,1},5395}],
219 [{{127,0,0,1},5390}, {{127,0,0,1},5393}, {{127,0,0,1},5394}],
220 [{{127,0,0,1},5390}, {{127,0,0,1},5393}, {{127,0,0,1},5395}],
221 [{{127,0,0,1},5391}, {{127,0,0,1},5392}, {{127,0,0,1},5394}],
222 [{{127,0,0,1},5391}, {{127,0,0,1},5392}, {{127,0,0,1},5395}],
223 [{{127,0,0,1},5391}, {{127,0,0,1},5393}, {{127,0,0,1},5394}],
224 [{{127,0,0,1},5391}, {{127,0,0,1},5393}, {{127,0,0,1},5395}]
229 {ok
, C
} = new(Config
),
233 % jumping through a few hoops here as I changed the way select
234 % worked to not return the results of getting the identifier as
235 % it worked before. So in this case we unbox the selection
236 [{lwes_emitter_udp
, Selected
}] = select(C
),
237 % then sort the list of Ip/Address pairs
238 Ids
= lists:sort([ lwes_emitter_udp:id (S
) || S
<- Selected
]),
239 Answers
= possible_answers(T
),
240 % finally we check to see if we have an answer
241 lists:member (Ids
, Answers
) and A
248 check_selection_test_ () ->
251 case lwes_stats:start_link() of
252 {error
,{already_started
,_
}} -> exists
;
257 (Pid
) -> unlink(Pid
), gen_server:stop(Pid
)
260 ?
_assert(test_one(T
))
261 || T
<- [basic
, random
, group
]
265 normalize_test_ () ->
267 ?
_assertEqual (Expected
, normalize_emitters_config(Given
))
270 { {"127.0.0.1",9191},
271 {1,group
,[{"127.0.0.1",9191}]} },
272 { {"127.0.0.1",9191,[{ttl
,25}]},
273 {1,group
,[{"127.0.0.1",9191,[{ttl
,25}]}]} },
274 { {2, [{"127.0.0.1",9191},{"127.0.0.1",9292}]},
275 {2,group
,[{"127.0.0.1",9191},{"127.0.0.1",9292}]} },
276 { {1, [{"127.0.0.1",9191},{"127.0.0.1",9292}]},
277 {1,group
,[{"127.0.0.1",9191},{"127.0.0.1",9292}]} },
278 { {2, random
, [{"127.0.0.1",30000},{"127.0.0.1",30001},{"127.0.0.1",30002}]},
279 {2, random
, [{"127.0.0.1",30000}, {"127.0.0.1",30001}, {"127.0.0.1",30002}]} },
280 { {3, group
, [{1,random
,[{"127.0.0.1",5301},{"127.0.0.1",5302}]},{1,random
,[{"127.0.0.1",5311},{"127.0.0.1",5312}]},{1,random
,[{"127.0.0.1",5321},{"127.0.0.1",5322}]}]},
281 {3, group
, [{1,random
,[{"127.0.0.1",5301},{"127.0.0.1",5302}]},{1,random
,[{"127.0.0.1",5311},{"127.0.0.1",5312}]},{1,random
,[{"127.0.0.1",5321},{"127.0.0.1",5322}]}]} },
282 { {2, group
, [{2, random
, [{"127.0.0.1",30000},{"127.0.0.1",30001},{"127.0.0.1",30002}]},{lwes_emitter_stdout
,[{label
,stdout
}]}]},
283 {2, group
, [{2, random
, [{"127.0.0.1",30000},{"127.0.0.1",30001},{"127.0.0.1",30002}]},{lwes_emitter_stdout
,[{label
,stdout
}]}]}
285 % TODO: support labels and sub-labels
286 % {"127.0.0.1",9191} ->
287 % {"127.0.0.1",9191,[{label, abc}]} ->
288 % {1, group, [{"127.0.0.1",9191}], abc}
290 % {2,group,[{"127.0.0.1",9191},{"127.0.0.1",9292}]} ->
291 % {2, group, [{"127.0.0.1",9191},{"127.0.0.1",9292}], abc}
293 % {2, group, [{2, random, [{"127.0.0.1",30000},
294 % {"127.0.0.1",30001},
295 % {"127.0.0.1",30002}]},
296 % {lwes_emitter_stdout,[{label,stdout}]}
299 % {2, group, [{2, random, [{"127.0.0.1",30000},
300 % {"127.0.0.1",30001},
301 % {"127.0.0.1",30002}], [{label,udp}]},
302 % {lwes_emitter_stdout,[{label,stdout}]}