restructure to be a littel more flexible
[lwes-erlang/github-mirror.git] / src / lwes.erl
blob7ea340e4fd7e3a0dc4c4fd8b10e765fab594dd0f
1 %%%
2 %%% Light Weight Event System (LWES)
3 %%%
4 %%% Creating Events
5 %%% Event0 = lwes_event:new ("MyEvent"),
6 %%% Event1 = lwes_event:set_uint16 (Event0, "MyUint16", 25),
7 %%%
8 %%% Emitting to a single channel
9 %%%
10 %%% {ok, Channel0} = lwes:open (emitter, {Ip, Port})
11 %%% Channel1 = lwes:emit (Channel0, Event1).
12 %%%
13 %%% Emit to several channels
14 %%%
15 %%% % emit to 1 of a set in a round robin fashion
16 %%% {ok, Channels0} = lwes:open (emitters, {1, [{Ip1,Port1},...{IpN,PortN}]})
17 %%% Channels1 = lwes:emit (Channels0, Event1)
18 %%% Channels2 = lwes:emit (Channels1, Event2)
19 %%% ...
20 %%% lwes:close (ChannelsN)
21 %%%
22 %%% % emit to 2 of a set in an m of n fashion (ie, emit to first 2 in list,
23 %%% % then 2nd and 3rd, then 3rd and 4th, etc., wraps at end of list)
24 %%% {ok, Channels0} = lwes:open (emitters, {2, [{Ip1,Port1},...{IpN,PortN}]})
25 %%%
26 %%% Listening via callback
27 %%%
28 %%% {ok, Channel} = lwes:open (listener, {Ip, Port})
29 %%% lwes:listen (Channel, Fun, Type, Accum).
30 %%%
31 %%% Fun is called for each event
32 %%%
33 %%% Closing channel
34 %%%
35 %%% lwes:close (Channel)
37 -module (lwes).
39 -include_lib ("lwes.hrl").
40 -include ("lwes_internal.hrl").
42 %% API
43 -export ([ start/0,
44 open/2, % (Type, Config) -> {ok, Channel}
45 emit/2,
46 emit/3,
47 listen/4,
48 close/1,
49 stats/0,
50 stats_raw/0,
51 enable_validation/1 ]).
53 %%====================================================================
54 %% API functions
55 %%====================================================================
57 start () ->
58 application:start (lwes).
61 % open an lwes emitter, listener or set of emitters
63 % config for emitter/listener is
64 % { Ip, Port }
66 % config for emitters (aka, multi emitter) is, default strategy is queue
67 % for backward compatibility
68 % { NumberToSendToInThisGroup, [queue | random]
69 % [
70 % {Ip0,Port0},
71 % ...,
72 % {IpN,PortN}
73 % ]
74 % }
76 % config for groups is
77 % { NumberOfGroupsToSendTo,
78 % group,
79 % [
80 % { NumberToSendToInThisGroup,
81 % Type,
82 % [
83 % {Ip0,Port0},
84 % ...
85 % {IpN,PortN}
86 % ]
87 % },
88 % ...
89 % ]
90 % }
91 % an example group emission might be
92 % { 2,
93 % group,
94 % [
95 % { 1,
96 % random,
97 % [ {Ip0, Port0},
98 % ...
99 % {IpN, PortN}
101 % },
102 % { 1,
103 % random,
104 % [ {IpN+1, PortN+1},
105 % ...
106 % {IpN+M, PortN+M}
111 % which should send each event to one machine in each group
113 open (emitter, Config) ->
114 open (emitters, Config);
115 open (emitters, Config) ->
116 lwes_multi_emitter:new (Config);
117 open (listener, Config) ->
118 try lwes_channel:new (listener, Config) of
119 C -> lwes_channel:open (C)
120 catch
121 _:_ -> { error, bad_ip_port }
122 end;
123 open (_, _) ->
124 { error, bad_type }.
126 % emit an array of events
127 emit (ChannelsIn, []) ->
128 ChannelsIn;
130 emit (ChannelsIn, [ HeadEvent = #lwes_event{} | TailEvents]) ->
131 ChannelsOut = emit (ChannelsIn, HeadEvent),
132 emit (ChannelsOut, TailEvents);
134 % emit an event to one or more channels
135 emit (Channel, Event) when is_record (Channel, lwes_channel) ->
136 lwes_channel:send_to (Channel, lwes_event:to_binary (Event)),
137 % channel doesn't actually change for a single emitter
138 Channel;
139 emit (StateIn = {Module,_}, Event) when is_atom (Module) ->
140 Module:emit (StateIn, Event);
141 emit (Channels, Event) ->
142 lwes_multi_emitter:emit (Channels, Event),
143 Channels.
145 % emit an event to one or more channels
146 emit (Channel, Event, SpecName) ->
147 case lwes_esf_validator:validate (SpecName, Event) of
148 ok ->
149 emit (Channel, Event);
150 {error, Error} ->
151 error_logger:error_msg("Event validation error ~p", [Error]),
152 Channel
153 end.
155 % listen for events
157 % Callback function - function is called with an event in given format
158 % and the current state, it should return the next
159 % state
161 % Type is one of
163 % raw - callback is given raw udp structure, use lwes_event:from_udp to
164 % turn into event
165 % list - callback is given an #lwes_event record where the name is a
166 % binary, and the attributes is a proplist where keys are binaries,
167 % and values are either integers (for lwes int types), binaries
168 % (for lwes strings), true|false atoms (for lwes booleans),
169 % or 4-tuples (for lwes ip addresses)
170 % tagged - callback is given an #lwes_event record where the name is a
171 % binary, and the attributes are 3-tuples with the first element
172 % the type of data, the second the key as a binary and the
173 % third the values as in the list format
174 % dict - callback is given an #lwes_event record where the name is a
175 % binary, and the attributes are a dictionary with a binary
176 % key and value according to the type
177 % json - this returns a proplist instead of an #lwes_event record. The
178 % valuse are mostly the same as list, but ip addresses are strings
179 % (as binary). This should means you can pass the returned value
180 % to mochijson2:encode (or other json encoders), and have the event
181 % as a json document
182 % json_eep18 - uses the eep18 format of mochijson2 decode
183 % json_proplist - uses the proplist format of mochijson2 decode
185 % Initial State is whatever you want
186 listen (Channel, CallbackFunction, EventType, CallbackInitialState)
187 when is_function (CallbackFunction, 2),
188 EventType =:= raw ; EventType =:= tagged ;
189 EventType =:= list ; EventType =:= dict ;
190 EventType =:= json ; EventType =:= json_proplist ;
191 EventType =:= json_eep18 ->
192 lwes_channel:register_callback (Channel, CallbackFunction,
193 EventType, CallbackInitialState).
195 % close the channel or channels
196 close (Channel) when is_record (Channel, lwes_channel) ->
197 lwes_channel:close (Channel);
198 close (StateIn = {Module,_}) when is_atom (Module) ->
199 Module:close (StateIn);
200 close (Channels) ->
201 lwes_multi_emitter:close (Channels).
203 stats () ->
204 case application:get_application (lwes) of
205 undefined -> {error, {not_started, lwes}};
206 _ ->
207 lwes_stats:print(none),
209 end.
211 stats_raw () ->
212 case application:get_application (lwes) of
213 undefined -> {error, {not_started, lwes}};
214 _ ->
215 lwes_stats:rollup(none)
216 end.
219 % enable validation of the events sent via this LWES client
220 % against the specification (ESF)
222 % ESFInfo - a list of tuples of the form { Name, FilePath }
224 % Example : [{Name1, path1}, {Name2, path2}]
226 % 'Name' is used to match the 'event' with a particular
227 % ESF File
229 % 'FilePath' is the path to the ESF File
230 enable_validation (ESFInfo) ->
231 lists:foreach (
232 fun ({ESF, File}) -> lwes_esf_validator:add_esf (ESF, File) end,
233 ESFInfo).
235 %%====================================================================
236 %% Internal functions
237 %%====================================================================
239 %%====================================================================
240 %% Test functions
241 %%====================================================================
242 -ifdef (TEST).
243 -include_lib ("eunit/include/eunit.hrl").
245 -define(TEST_TABLE, lwes_test).
247 setup() ->
248 ok = application:start(lwes),
249 ets:new(?TEST_TABLE,[named_table,public]),
252 teardown(ok) ->
253 ets:delete(?TEST_TABLE),
254 application:stop(lwes).
256 build_one (EventsToSend, PerfectPercent,
257 EmitterConfig, EmitterType, ListenerConfigs) ->
258 fun() ->
259 Listeners =
260 [ begin
261 {ok, L} = open(listener, LC),
262 listen (L,
263 fun({udp,_,_,_,E},A) ->
264 [{E}] = ets:lookup(?TEST_TABLE, E),
266 end,
267 raw, ok),
270 || LC <- ListenerConfigs
272 {ok, Emitter0} = open(EmitterType,EmitterConfig),
274 InitialEvent = lwes_event:new("foo"),
275 EmitterFinal =
276 lists:foldl(
277 fun (C, EmitterIn) ->
278 EventWithCounter = lwes_event:set_uint16(InitialEvent,"bar",C),
279 Event = lwes_event:to_binary(EventWithCounter),
280 ets:insert(?TEST_TABLE, {Event}),
281 emit(EmitterIn, Event)
282 end,
283 Emitter0,
284 lists:seq(1,EventsToSend)
286 timer:sleep(500),
287 close(EmitterFinal),
288 [ close(L) || L <- Listeners ],
290 Rollup = lwes_stats:rollup(none),
291 {Sent,Received} =
292 lists:foldl (fun ([_,S,R,_,_,_],{AS,AR}) ->
293 % calculated the actual percent received
294 Percent = S/EventsToSend*100,
295 % and then check if the difference is within
296 % 15 percent which is about what I was seeing
297 % while testing
298 WithinBound = abs(PerfectPercent-Percent) < 15,
299 case WithinBound of
300 true -> ok;
301 false ->
302 ?debugFmt("WARNING: out of bounds : "
303 "abs(~p-~p) < 15 => ~p~n",
304 [PerfectPercent, Percent, WithinBound]),
306 end,
307 ?assert(WithinBound),
308 {AS + S, AR + R}
309 end,
310 {0,0},
311 Rollup
313 ?assertEqual(Sent, Received)
314 end.
316 simple_test_ () ->
317 NumberToSendTo = 1,
318 EventsToSend = 100,
319 EmitterConfig = {"127.0.0.1",12321},
320 EmitterType = emitter,
321 ListenerConfigs = [ EmitterConfig ],
322 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
323 { setup,
324 fun setup/0,
325 fun teardown/1,
327 build_one (EventsToSend, PerfectPercent,
328 EmitterConfig, EmitterType, ListenerConfigs)
332 mondemand_w_ttl_test_ () ->
333 Address = {"127.0.0.1",12321, [{ttl,25}]},
334 NumberToSendTo = 1,
335 EventsToSend = 100,
336 EmitterConfig = {NumberToSendTo, [Address]},
337 EmitterType = emitters,
338 ListenerConfigs = [ Address ],
339 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
340 { setup,
341 fun setup/0,
342 fun teardown/1,
344 build_one (EventsToSend, PerfectPercent,
345 EmitterConfig, EmitterType, ListenerConfigs)
350 mondemand_legacy_test_ () ->
351 Address = {"127.0.0.1",12321},
352 NumberToSendTo = 1,
353 EventsToSend = 100,
354 % mondemand used the queue form which I got rid of, now I'm converting
355 % those to group configs, so the following should work
356 EmitterConfig = {NumberToSendTo, [Address]},
357 EmitterType = emitters,
358 ListenerConfigs = [ Address ],
359 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
360 { setup,
361 fun setup/0,
362 fun teardown/1,
364 build_one (EventsToSend, PerfectPercent,
365 EmitterConfig, EmitterType, ListenerConfigs)
369 multi_random_test_ () ->
370 NumberToSendTo = 2,
371 ListenerConfigs = [ {"127.0.0.1", 30000},
372 {"127.0.0.1", 30001},
373 {"127.0.0.1", 30002}
375 EmitterConfig = { NumberToSendTo, random, ListenerConfigs },
376 EmitterType = emitters,
377 EventsToSend = 100,
378 % calculate the perfect number of events per listener we should get
379 % this is used below
380 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
381 { setup,
382 fun setup/0,
383 fun teardown/1,
385 build_one (EventsToSend, PerfectPercent,
386 EmitterConfig, EmitterType, ListenerConfigs)
390 grouped_random_test_ () ->
391 NumberToSendTo = 3,
392 Group1Config = [ { "127.0.0.1", 5301 },
393 { "127.0.0.1", 5302 }
395 Group2Config = [ { "127.0.0.1", 5303 },
396 { "127.0.0.1", 5304 }
398 Group3Config = [ { "127.0.0.1", 5305 },
399 { "127.0.0.1", 5306 }
401 ListenerConfigs = Group1Config ++ Group2Config ++ Group3Config,
402 EmitterConfig = { NumberToSendTo, group,
404 { 1, random, Group1Config },
405 { 1, random, Group2Config },
406 { 1, random, Group3Config }
409 EmitterType = emitters,
410 EventsToSend = 100,
411 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
412 { setup,
413 fun setup/0,
414 fun teardown/1,
416 build_one (EventsToSend, PerfectPercent,
417 EmitterConfig, EmitterType, ListenerConfigs)
422 -endif.