2 %%% Light Weight Event System (LWES)
5 %%% Event0 = lwes_event:new ("MyEvent"),
6 %%% Event1 = lwes_event:set_uint16 (Event0, "MyUint16", 25),
8 %%% Emitting to a single channel
10 %%% {ok, Channel0} = lwes:open (emitter, {Ip, Port})
11 %%% Channel1 = lwes:emit (Channel0, Event1).
13 %%% Emit to several channels
15 %%% % emit to 1 of a set in a round robin fashion
16 %%% {ok, Channels0} = lwes:open (emitters, {1, [{Ip1,Port1},...{IpN,PortN}]})
17 %%% Channels1 = lwes:emit (Channels0, Event1)
18 %%% Channels2 = lwes:emit (Channels1, Event2)
20 %%% lwes:close (ChannelsN)
22 %%% % emit to 2 of a set in an m of n fashion (ie, emit to first 2 in list,
23 %%% % then 2nd and 3rd, then 3rd and 4th, etc., wraps at end of list)
24 %%% {ok, Channels0} = lwes:open (emitters, {2, [{Ip1,Port1},...{IpN,PortN}]})
26 %%% Listening via callback
28 %%% {ok, Channel} = lwes:open (listener, {Ip, Port})
29 %%% lwes:listen (Channel, Fun, Type, Accum).
31 %%% Fun is called for each event
35 %%% lwes:close (Channel)
39 -include_lib ("lwes.hrl").
40 -include ("lwes_internal.hrl").
44 open
/2, % (Type, Config) -> {ok, Channel}
51 enable_validation
/1 ]).
53 %%====================================================================
55 %%====================================================================
58 application:start (lwes
).
61 % open an lwes emitter, listener or set of emitters
63 % config for emitter/listener is
66 % config for emitters (aka, multi emitter) is, default strategy is queue
67 % for backward compatibility
68 % { NumberToSendToInThisGroup, [queue | random]
76 % config for groups is
77 % { NumberOfGroupsToSendTo,
80 % { NumberToSendToInThisGroup,
91 % an example group emission might be
104 % [ {IpN+1, PortN+1},
111 % which should send each event to one machine in each group
113 open (emitter
, Config
) ->
114 open (emitters
, Config
);
115 open (emitters
, Config
) ->
116 lwes_multi_emitter:new (Config
);
117 open (listener
, Config
) ->
118 try
lwes_channel:new (listener
, Config
) of
119 C
-> lwes_channel:open (C
)
121 _:_
-> { error
, bad_ip_port
}
126 % emit an array of events
127 emit (ChannelsIn
, []) ->
130 emit (ChannelsIn
, [ HeadEvent
= #lwes_event
{} | TailEvents
]) ->
131 ChannelsOut
= emit (ChannelsIn
, HeadEvent
),
132 emit (ChannelsOut
, TailEvents
);
134 % emit an event to one or more channels
135 emit (Channel
, Event
) when is_record (Channel
, lwes_channel
) ->
136 lwes_channel:send_to (Channel
, lwes_event:to_binary (Event
)),
137 % channel doesn't actually change for a single emitter
139 emit (StateIn
= {Module
,_
}, Event
) when is_atom (Module
) ->
140 Module:emit (StateIn
, Event
);
141 emit (Channels
, Event
) ->
142 lwes_multi_emitter:emit (Channels
, Event
),
145 % emit an event to one or more channels
146 emit (Channel
, Event
, SpecName
) ->
147 case lwes_esf_validator:validate (SpecName
, Event
) of
149 emit (Channel
, Event
);
151 error_logger:error_msg("Event validation error ~p", [Error
]),
157 % Callback function - function is called with an event in given format
158 % and the current state, it should return the next
163 % raw - callback is given raw udp structure, use lwes_event:from_udp to
165 % list - callback is given an #lwes_event record where the name is a
166 % binary, and the attributes is a proplist where keys are binaries,
167 % and values are either integers (for lwes int types), binaries
168 % (for lwes strings), true|false atoms (for lwes booleans),
169 % or 4-tuples (for lwes ip addresses)
170 % tagged - callback is given an #lwes_event record where the name is a
171 % binary, and the attributes are 3-tuples with the first element
172 % the type of data, the second the key as a binary and the
173 % third the values as in the list format
174 % dict - callback is given an #lwes_event record where the name is a
175 % binary, and the attributes are a dictionary with a binary
176 % key and value according to the type
177 % json - this returns a proplist instead of an #lwes_event record. The
178 % valuse are mostly the same as list, but ip addresses are strings
179 % (as binary). This should means you can pass the returned value
180 % to mochijson2:encode (or other json encoders), and have the event
182 % json_eep18 - uses the eep18 format of mochijson2 decode
183 % json_proplist - uses the proplist format of mochijson2 decode
185 % Initial State is whatever you want
186 listen (Channel
, CallbackFunction
, EventType
, CallbackInitialState
)
187 when is_function (CallbackFunction
, 2),
188 EventType
=:= raw
; EventType
=:= tagged
;
189 EventType
=:= list ; EventType
=:= dict
;
190 EventType
=:= json
; EventType
=:= json_proplist
;
191 EventType
=:= json_eep18
->
192 lwes_channel:register_callback (Channel
, CallbackFunction
,
193 EventType
, CallbackInitialState
).
195 % close the channel or channels
196 close (Channel
) when is_record (Channel
, lwes_channel
) ->
197 lwes_channel:close (Channel
);
198 close (StateIn
= {Module
,_
}) when is_atom (Module
) ->
199 Module:close (StateIn
);
201 lwes_multi_emitter:close (Channels
).
204 case application:get_application (lwes
) of
205 undefined
-> {error
, {not_started
, lwes
}};
207 lwes_stats:print(none
),
212 case application:get_application (lwes
) of
213 undefined
-> {error
, {not_started
, lwes
}};
215 lwes_stats:rollup(none
)
219 % enable validation of the events sent via this LWES client
220 % against the specification (ESF)
222 % ESFInfo - a list of tuples of the form { Name, FilePath }
224 % Example : [{Name1, path1}, {Name2, path2}]
226 % 'Name' is used to match the 'event' with a particular
229 % 'FilePath' is the path to the ESF File
230 enable_validation (ESFInfo
) ->
232 fun ({ESF
, File
}) -> lwes_esf_validator:add_esf (ESF
, File
) end,
235 %%====================================================================
236 %% Internal functions
237 %%====================================================================
239 %%====================================================================
241 %%====================================================================
243 -include_lib ("eunit/include/eunit.hrl").
245 -define(TEST_TABLE
, lwes_test
).
248 ok
= application:start(lwes
),
249 ets:new(?TEST_TABLE
,[named_table
,public
]),
253 ets:delete(?TEST_TABLE
),
254 application:stop(lwes
).
256 build_one (EventsToSend
, PerfectPercent
,
257 EmitterConfig
, EmitterType
, ListenerConfigs
) ->
261 {ok
, L
} = open(listener
, LC
),
263 fun({udp
,_
,_
,_
,E
},A
) ->
264 [{E
}] = ets:lookup(?TEST_TABLE
, E
),
270 || LC
<- ListenerConfigs
272 {ok
, Emitter0
} = open(EmitterType
,EmitterConfig
),
274 InitialEvent
= lwes_event:new("foo"),
277 fun (C
, EmitterIn
) ->
278 EventWithCounter
= lwes_event:set_uint16(InitialEvent
,"bar",C
),
279 Event
= lwes_event:to_binary(EventWithCounter
),
280 ets:insert(?TEST_TABLE
, {Event
}),
281 emit(EmitterIn
, Event
)
284 lists:seq(1,EventsToSend
)
288 [ close(L
) || L
<- Listeners
],
290 Rollup
= lwes_stats:rollup(none
),
292 lists:foldl (fun ([_
,S
,R
,_
,_
,_
],{AS
,AR
}) ->
293 % calculated the actual percent received
294 Percent
= S
/EventsToSend
*100,
295 % and then check if the difference is within
296 % 15 percent which is about what I was seeing
298 WithinBound
= abs(PerfectPercent
-Percent
) < 15,
302 ?
debugFmt("WARNING: out of bounds : "
303 "abs(~p-~p) < 15 => ~p~n",
304 [PerfectPercent
, Percent
, WithinBound
]),
307 ?
assert(WithinBound
),
313 ?
assertEqual(Sent
, Received
)
319 EmitterConfig
= {"127.0.0.1",12321},
320 EmitterType
= emitter
,
321 ListenerConfigs
= [ EmitterConfig
],
322 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
327 build_one (EventsToSend
, PerfectPercent
,
328 EmitterConfig
, EmitterType
, ListenerConfigs
)
332 mondemand_w_ttl_test_ () ->
333 Address
= {"127.0.0.1",12321, [{ttl
,25}]},
336 EmitterConfig
= {NumberToSendTo
, [Address
]},
337 EmitterType
= emitters
,
338 ListenerConfigs
= [ Address
],
339 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
344 build_one (EventsToSend
, PerfectPercent
,
345 EmitterConfig
, EmitterType
, ListenerConfigs
)
350 mondemand_legacy_test_ () ->
351 Address
= {"127.0.0.1",12321},
354 % mondemand used the queue form which I got rid of, now I'm converting
355 % those to group configs, so the following should work
356 EmitterConfig
= {NumberToSendTo
, [Address
]},
357 EmitterType
= emitters
,
358 ListenerConfigs
= [ Address
],
359 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
364 build_one (EventsToSend
, PerfectPercent
,
365 EmitterConfig
, EmitterType
, ListenerConfigs
)
369 multi_random_test_ () ->
371 ListenerConfigs
= [ {"127.0.0.1", 30000},
372 {"127.0.0.1", 30001},
375 EmitterConfig
= { NumberToSendTo
, random
, ListenerConfigs
},
376 EmitterType
= emitters
,
378 % calculate the perfect number of events per listener we should get
380 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
385 build_one (EventsToSend
, PerfectPercent
,
386 EmitterConfig
, EmitterType
, ListenerConfigs
)
390 grouped_random_test_ () ->
392 Group1Config
= [ { "127.0.0.1", 5301 },
393 { "127.0.0.1", 5302 }
395 Group2Config
= [ { "127.0.0.1", 5303 },
396 { "127.0.0.1", 5304 }
398 Group3Config
= [ { "127.0.0.1", 5305 },
399 { "127.0.0.1", 5306 }
401 ListenerConfigs
= Group1Config
++ Group2Config
++ Group3Config
,
402 EmitterConfig
= { NumberToSendTo
, group
,
404 { 1, random
, Group1Config
},
405 { 1, random
, Group2Config
},
406 { 1, random
, Group3Config
}
409 EmitterType
= emitters
,
411 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
416 build_one (EventsToSend
, PerfectPercent
,
417 EmitterConfig
, EmitterType
, ListenerConfigs
)