Merge pull request #14 from lwes/pluggable-emission
[lwes-erlang/github-mirror.git] / src / lwes.erl
blob1641f462dd409099183e29fc5b9ea1bb9199e2fe
1 %%%
2 %%% Light Weight Event System (LWES)
3 %%%
4 %%% Creating Events
5 %%% Event0 = lwes_event:new ("MyEvent"),
6 %%% Event1 = lwes_event:set_uint16 (Event0, "MyUint16", 25),
7 %%%
8 %%% Emitting to a single channel
9 %%%
10 %%% {ok, Channel0} = lwes:open (emitter, {Ip, Port})
11 %%% Channel1 = lwes:emit (Channel0, Event1).
12 %%%
13 %%% Emit to several channels
14 %%%
15 %%% % emit to 1 of a set in a round robin fashion
16 %%% {ok, Channels0} = lwes:open (emitters, {1, [{Ip1,Port1},...{IpN,PortN}]})
17 %%% Channels1 = lwes:emit (Channels0, Event1)
18 %%% Channels2 = lwes:emit (Channels1, Event2)
19 %%% ...
20 %%% lwes:close (ChannelsN)
21 %%%
22 %%% % emit to 2 of a set in an m of n fashion (ie, emit to first 2 in list,
23 %%% % then 2nd and 3rd, then 3rd and 4th, etc., wraps at end of list)
24 %%% {ok, Channels0} = lwes:open (emitters, {2, [{Ip1,Port1},...{IpN,PortN}]})
25 %%%
26 %%% Listening via callback
27 %%%
28 %%% {ok, Channel} = lwes:open (listener, {Ip, Port})
29 %%% lwes:listen (Channel, Fun, Type, Accum).
30 %%%
31 %%% Fun is called for each event
32 %%%
33 %%% Closing channel
34 %%%
35 %%% lwes:close (Channel)
37 -module (lwes).
39 -include_lib ("lwes.hrl").
40 -include ("lwes_internal.hrl").
42 %% API
43 -export ([ start/0,
44 open/2, % (Type, Config) -> {ok, Channel}
45 emit/2,
46 emit/3,
47 listen/4,
48 close/1,
49 stats/0,
50 stats_raw/0,
51 enable_validation/1 ]).
53 %%====================================================================
54 %% API functions
55 %%====================================================================
57 start () ->
58 application:start (lwes).
61 % open an lwes emitter, listener or set of emitters
63 % config for emitter/listener is
64 % { Ip, Port }
66 % config for groups is
67 % { NumberOfGroupsToSendTo,
68 % group,
69 % [
70 % { NumberToSendToInThisGroup,
71 % Type,
72 % [
73 % {Ip0,Port0},
74 % ...
75 % {IpN,PortN}
76 % ]
77 % },
78 % ...
79 % ]
80 % }
81 % an example group emission might be
82 % { 2,
83 % group,
84 % [
85 % { 1,
86 % random,
87 % [ {Ip0, Port0},
88 % ...
89 % {IpN, PortN}
90 % ]
91 % },
92 % { 1,
93 % random,
94 % [ {IpN+1, PortN+1},
95 % ...
96 % {IpN+M, PortN+M}
97 % ]
98 % }
99 % ]
101 % which should send each event to one machine in each group
103 open (emitter, Config) ->
104 open (emitters, Config);
105 open (emitters, Config) ->
106 lwes_multi_emitter:new (Config);
107 open (listener, Config) ->
108 try lwes_channel:new (listener, Config) of
109 C -> lwes_channel:open (C)
110 catch
111 _:_ -> { error, bad_ip_port }
112 end;
113 open (_, _) ->
114 { error, bad_type }.
116 % emit an array of events
117 emit (ChannelsIn, []) ->
118 ChannelsIn;
120 emit (ChannelsIn, [ HeadEvent = #lwes_event{} | TailEvents]) ->
121 ChannelsOut = emit (ChannelsIn, HeadEvent),
122 emit (ChannelsOut, TailEvents);
124 % emit an event to one or more channels
125 emit (Channel, Event) when is_record (Channel, lwes_channel) ->
126 lwes_channel:send_to (Channel, lwes_event:to_binary (Event)),
127 % channel doesn't actually change for a single emitter
128 Channel;
129 emit (StateIn = {Module,_}, Event) when is_atom (Module) ->
130 Module:emit (StateIn, Event);
131 emit (Channels, Event) ->
132 lwes_multi_emitter:emit (Channels, Event),
133 Channels.
135 % emit an event to one or more channels
136 emit (Channel, Event, SpecName) ->
137 case lwes_esf_validator:validate (SpecName, Event) of
138 ok ->
139 emit (Channel, Event);
140 {error, Error} ->
141 error_logger:error_msg("Event validation error ~p", [Error]),
142 Channel
143 end.
145 % listen for events
147 % Callback function - function is called with an event in given format
148 % and the current state, it should return the next
149 % state
151 % Type is one of
153 % raw - callback is given raw udp structure, use lwes_event:from_udp to
154 % turn into event
155 % list - callback is given an #lwes_event record where the name is a
156 % binary, and the attributes is a proplist where keys are binaries,
157 % and values are either integers (for lwes int types), binaries
158 % (for lwes strings), true|false atoms (for lwes booleans),
159 % or 4-tuples (for lwes ip addresses)
160 % tagged - callback is given an #lwes_event record where the name is a
161 % binary, and the attributes are 3-tuples with the first element
162 % the type of data, the second the key as a binary and the
163 % third the values as in the list format
164 % dict - callback is given an #lwes_event record where the name is a
165 % binary, and the attributes are a dictionary with a binary
166 % key and value according to the type
167 % json - this returns a proplist instead of an #lwes_event record. The
168 % valuse are mostly the same as list, but ip addresses are strings
169 % (as binary). This should means you can pass the returned value
170 % to mochijson2:encode (or other json encoders), and have the event
171 % as a json document
172 % json_eep18 - uses the eep18 format of mochijson2 decode
173 % json_proplist - uses the proplist format of mochijson2 decode
175 % Initial State is whatever you want
176 listen (Channel, CallbackFunction, EventType, CallbackInitialState)
177 when is_function (CallbackFunction, 2),
178 EventType =:= raw ; EventType =:= tagged ;
179 EventType =:= list ; EventType =:= dict ;
180 EventType =:= json ; EventType =:= json_proplist ;
181 EventType =:= json_eep18 ->
182 lwes_channel:register_callback (Channel, CallbackFunction,
183 EventType, CallbackInitialState).
185 % close the channel or channels
186 close (Channel) when is_record (Channel, lwes_channel) ->
187 lwes_channel:close (Channel);
188 close (StateIn = {Module,_}) when is_atom (Module) ->
189 Module:close (StateIn);
190 close (Channels) ->
191 lwes_multi_emitter:close (Channels).
193 stats () ->
194 case application:get_application (lwes) of
195 undefined -> {error, {not_started, lwes}};
196 _ ->
197 lwes_stats:print(none),
199 end.
201 stats_raw () ->
202 case application:get_application (lwes) of
203 undefined -> {error, {not_started, lwes}};
204 _ ->
205 lwes_stats:rollup(none)
206 end.
209 % enable validation of the events sent via this LWES client
210 % against the specification (ESF)
212 % ESFInfo - a list of tuples of the form { Name, FilePath }
214 % Example : [{Name1, path1}, {Name2, path2}]
216 % 'Name' is used to match the 'event' with a particular
217 % ESF File
219 % 'FilePath' is the path to the ESF File
220 enable_validation (ESFInfo) ->
221 lists:foreach (
222 fun ({ESF, File}) -> lwes_esf_validator:add_esf (ESF, File) end,
223 ESFInfo).
225 %%====================================================================
226 %% Internal functions
227 %%====================================================================
229 %%====================================================================
230 %% Test functions
231 %%====================================================================
232 -ifdef (TEST).
233 -include_lib ("eunit/include/eunit.hrl").
235 -define(TEST_TABLE, lwes_test).
237 setup() ->
238 ok = application:start(lwes),
239 ets:new(?TEST_TABLE,[named_table,public]),
242 teardown(ok) ->
243 ets:delete(?TEST_TABLE),
244 application:stop(lwes).
246 build_one (EventsToSend, PerfectPercent,
247 EmitterConfig, EmitterType, ListenerConfigs) ->
248 fun() ->
249 Listeners =
250 [ begin
251 {ok, L} = open(listener, LC),
252 listen (L,
253 fun({udp,_,_,_,E},A) ->
254 [{E}] = ets:lookup(?TEST_TABLE, E),
256 end,
257 raw, ok),
260 || LC <- ListenerConfigs
262 {ok, Emitter0} = open(EmitterType,EmitterConfig),
264 InitialEvent = lwes_event:new("foo"),
265 EmitterFinal =
266 lists:foldl(
267 fun (C, EmitterIn) ->
268 EventWithCounter = lwes_event:set_uint16(InitialEvent,"bar",C),
269 Event = lwes_event:to_binary(EventWithCounter),
270 ets:insert(?TEST_TABLE, {Event}),
271 emit(EmitterIn, Event)
272 end,
273 Emitter0,
274 lists:seq(1,EventsToSend)
276 timer:sleep(500),
277 close(EmitterFinal),
278 [ close(L) || L <- Listeners ],
280 Rollup = lwes_stats:rollup(none),
281 {Sent,Received} =
282 lists:foldl (fun ([_,S,R,_,_,_],{AS,AR}) ->
283 % calculated the actual percent received
284 Percent = S/EventsToSend*100,
285 % and then check if the difference is within
286 % 15 percent which is about what I was seeing
287 % while testing
288 WithinBound = abs(PerfectPercent-Percent) < 15,
289 case WithinBound of
290 true -> ok;
291 false ->
292 ?debugFmt("WARNING: out of bounds : "
293 "abs(~p-~p) < 15 => ~p~n",
294 [PerfectPercent, Percent, WithinBound]),
296 end,
297 ?assert(WithinBound),
298 {AS + S, AR + R}
299 end,
300 {0,0},
301 Rollup
303 ?assertEqual(Sent, Received)
304 end.
306 simple_test_ () ->
307 NumberToSendTo = 1,
308 EventsToSend = 100,
309 EmitterConfig = {"127.0.0.1",12321},
310 EmitterType = emitter,
311 ListenerConfigs = [ EmitterConfig ],
312 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
313 { setup,
314 fun setup/0,
315 fun teardown/1,
317 build_one (EventsToSend, PerfectPercent,
318 EmitterConfig, EmitterType, ListenerConfigs)
322 mondemand_w_ttl_test_ () ->
323 Address = {"127.0.0.1",12321, [{ttl,25}]},
324 NumberToSendTo = 1,
325 EventsToSend = 100,
326 EmitterConfig = {NumberToSendTo, [Address]},
327 EmitterType = emitters,
328 ListenerConfigs = [ Address ],
329 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
330 { setup,
331 fun setup/0,
332 fun teardown/1,
334 build_one (EventsToSend, PerfectPercent,
335 EmitterConfig, EmitterType, ListenerConfigs)
340 mondemand_legacy_test_ () ->
341 Address = {"127.0.0.1",12321},
342 NumberToSendTo = 1,
343 EventsToSend = 100,
344 % mondemand used the queue form which I got rid of, now I'm converting
345 % those to group configs, so the following should work
346 EmitterConfig = {NumberToSendTo, [Address]},
347 EmitterType = emitters,
348 ListenerConfigs = [ Address ],
349 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
350 { setup,
351 fun setup/0,
352 fun teardown/1,
354 build_one (EventsToSend, PerfectPercent,
355 EmitterConfig, EmitterType, ListenerConfigs)
359 multi_random_test_ () ->
360 NumberToSendTo = 2,
361 ListenerConfigs = [ {"127.0.0.1", 30000},
362 {"127.0.0.1", 30001},
363 {"127.0.0.1", 30002}
365 EmitterConfig = { NumberToSendTo, random, ListenerConfigs },
366 EmitterType = emitters,
367 EventsToSend = 100,
368 % calculate the perfect number of events per listener we should get
369 % this is used below
370 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
371 { setup,
372 fun setup/0,
373 fun teardown/1,
375 build_one (EventsToSend, PerfectPercent,
376 EmitterConfig, EmitterType, ListenerConfigs)
380 grouped_random_test_ () ->
381 NumberToSendTo = 3,
382 Group1Config = [ { "127.0.0.1", 5301 },
383 { "127.0.0.1", 5302 }
385 Group2Config = [ { "127.0.0.1", 5303 },
386 { "127.0.0.1", 5304 }
388 Group3Config = [ { "127.0.0.1", 5305 },
389 { "127.0.0.1", 5306 }
391 ListenerConfigs = Group1Config ++ Group2Config ++ Group3Config,
392 EmitterConfig = { NumberToSendTo, group,
394 { 1, random, Group1Config },
395 { 1, random, Group2Config },
396 { 1, random, Group3Config }
399 EmitterType = emitters,
400 EventsToSend = 100,
401 PerfectPercent = EventsToSend / length(ListenerConfigs) * NumberToSendTo,
402 { setup,
403 fun setup/0,
404 fun teardown/1,
406 build_one (EventsToSend, PerfectPercent,
407 EmitterConfig, EmitterType, ListenerConfigs)
412 -endif.