2 %%% Light Weight Event System (LWES)
5 %%% Event0 = lwes_event:new ("MyEvent"),
6 %%% Event1 = lwes_event:set_uint16 (Event0, "MyUint16", 25),
8 %%% Emitting to a single channel
10 %%% {ok, Channel0} = lwes:open (emitter, {Ip, Port})
11 %%% Channel1 = lwes:emit (Channel0, Event1).
13 %%% Emit to several channels
15 %%% % emit to 1 of a set in a round robin fashion
16 %%% {ok, Channels0} = lwes:open (emitters, {1, [{Ip1,Port1},...{IpN,PortN}]})
17 %%% Channels1 = lwes:emit (Channels0, Event1)
18 %%% Channels2 = lwes:emit (Channels1, Event2)
20 %%% lwes:close (ChannelsN)
22 %%% % emit to 2 of a set in an m of n fashion (ie, emit to first 2 in list,
23 %%% % then 2nd and 3rd, then 3rd and 4th, etc., wraps at end of list)
24 %%% {ok, Channels0} = lwes:open (emitters, {2, [{Ip1,Port1},...{IpN,PortN}]})
26 %%% Listening via callback
28 %%% {ok, Channel} = lwes:open (listener, {Ip, Port})
29 %%% lwes:listen (Channel, Fun, Type, Accum).
31 %%% Fun is called for each event
35 %%% lwes:close (Channel)
39 -include_lib ("lwes.hrl").
40 -include ("lwes_internal.hrl").
44 open
/2, % (Type, Config) -> {ok, Channel}
51 enable_validation
/1 ]).
53 %%====================================================================
55 %%====================================================================
58 application:start (lwes
).
61 % open an lwes emitter, listener or set of emitters
63 % config for emitter/listener is
66 % config for groups is
67 % { NumberOfGroupsToSendTo,
70 % { NumberToSendToInThisGroup,
81 % an example group emission might be
101 % which should send each event to one machine in each group
103 open (emitter
, Config
) ->
104 open (emitters
, Config
);
105 open (emitters
, Config
) ->
106 lwes_multi_emitter:new (Config
);
107 open (listener
, Config
) ->
108 try
lwes_channel:new (listener
, Config
) of
109 C
-> lwes_channel:open (C
)
111 _:_
-> { error
, bad_ip_port
}
116 % emit an array of events
117 emit (ChannelsIn
, []) ->
120 emit (ChannelsIn
, [ HeadEvent
= #lwes_event
{} | TailEvents
]) ->
121 ChannelsOut
= emit (ChannelsIn
, HeadEvent
),
122 emit (ChannelsOut
, TailEvents
);
124 % emit an event to one or more channels
125 emit (Channel
, Event
) when is_record (Channel
, lwes_channel
) ->
126 lwes_channel:send_to (Channel
, lwes_event:to_binary (Event
)),
127 % channel doesn't actually change for a single emitter
129 emit (StateIn
= {Module
,_
}, Event
) when is_atom (Module
) ->
130 Module:emit (StateIn
, Event
);
131 emit (Channels
, Event
) ->
132 lwes_multi_emitter:emit (Channels
, Event
),
135 % emit an event to one or more channels
136 emit (Channel
, Event
, SpecName
) ->
137 case lwes_esf_validator:validate (SpecName
, Event
) of
139 emit (Channel
, Event
);
141 error_logger:error_msg("Event validation error ~p", [Error
]),
147 % Callback function - function is called with an event in given format
148 % and the current state, it should return the next
153 % raw - callback is given raw udp structure, use lwes_event:from_udp to
155 % list - callback is given an #lwes_event record where the name is a
156 % binary, and the attributes is a proplist where keys are binaries,
157 % and values are either integers (for lwes int types), binaries
158 % (for lwes strings), true|false atoms (for lwes booleans),
159 % or 4-tuples (for lwes ip addresses)
160 % tagged - callback is given an #lwes_event record where the name is a
161 % binary, and the attributes are 3-tuples with the first element
162 % the type of data, the second the key as a binary and the
163 % third the values as in the list format
164 % dict - callback is given an #lwes_event record where the name is a
165 % binary, and the attributes are a dictionary with a binary
166 % key and value according to the type
167 % json - this returns a proplist instead of an #lwes_event record. The
168 % valuse are mostly the same as list, but ip addresses are strings
169 % (as binary). This should means you can pass the returned value
170 % to mochijson2:encode (or other json encoders), and have the event
172 % json_eep18 - uses the eep18 format of mochijson2 decode
173 % json_proplist - uses the proplist format of mochijson2 decode
175 % Initial State is whatever you want
176 listen (Channel
, CallbackFunction
, EventType
, CallbackInitialState
)
177 when is_function (CallbackFunction
, 2),
178 EventType
=:= raw
; EventType
=:= tagged
;
179 EventType
=:= list ; EventType
=:= dict
;
180 EventType
=:= json
; EventType
=:= json_proplist
;
181 EventType
=:= json_eep18
->
182 lwes_channel:register_callback (Channel
, CallbackFunction
,
183 EventType
, CallbackInitialState
).
185 % close the channel or channels
186 close (Channel
) when is_record (Channel
, lwes_channel
) ->
187 lwes_channel:close (Channel
);
188 close (StateIn
= {Module
,_
}) when is_atom (Module
) ->
189 Module:close (StateIn
);
191 lwes_multi_emitter:close (Channels
).
194 case application:get_application (lwes
) of
195 undefined
-> {error
, {not_started
, lwes
}};
197 lwes_stats:print(none
),
202 case application:get_application (lwes
) of
203 undefined
-> {error
, {not_started
, lwes
}};
205 lwes_stats:rollup(none
)
209 % enable validation of the events sent via this LWES client
210 % against the specification (ESF)
212 % ESFInfo - a list of tuples of the form { Name, FilePath }
214 % Example : [{Name1, path1}, {Name2, path2}]
216 % 'Name' is used to match the 'event' with a particular
219 % 'FilePath' is the path to the ESF File
220 enable_validation (ESFInfo
) ->
222 fun ({ESF
, File
}) -> lwes_esf_validator:add_esf (ESF
, File
) end,
225 %%====================================================================
226 %% Internal functions
227 %%====================================================================
229 %%====================================================================
231 %%====================================================================
233 -include_lib ("eunit/include/eunit.hrl").
235 -define(TEST_TABLE
, lwes_test
).
238 ok
= application:start(lwes
),
239 ets:new(?TEST_TABLE
,[named_table
,public
]),
243 ets:delete(?TEST_TABLE
),
244 application:stop(lwes
).
246 build_one (EventsToSend
, PerfectPercent
,
247 EmitterConfig
, EmitterType
, ListenerConfigs
) ->
251 {ok
, L
} = open(listener
, LC
),
253 fun({udp
,_
,_
,_
,E
},A
) ->
254 [{E
}] = ets:lookup(?TEST_TABLE
, E
),
260 || LC
<- ListenerConfigs
262 {ok
, Emitter0
} = open(EmitterType
,EmitterConfig
),
264 InitialEvent
= lwes_event:new("foo"),
267 fun (C
, EmitterIn
) ->
268 EventWithCounter
= lwes_event:set_uint16(InitialEvent
,"bar",C
),
269 Event
= lwes_event:to_binary(EventWithCounter
),
270 ets:insert(?TEST_TABLE
, {Event
}),
271 emit(EmitterIn
, Event
)
274 lists:seq(1,EventsToSend
)
278 [ close(L
) || L
<- Listeners
],
280 Rollup
= lwes_stats:rollup(none
),
282 lists:foldl (fun ([_
,S
,R
,_
,_
,_
],{AS
,AR
}) ->
283 % calculated the actual percent received
284 Percent
= S
/EventsToSend
*100,
285 % and then check if the difference is within
286 % 15 percent which is about what I was seeing
288 WithinBound
= abs(PerfectPercent
-Percent
) < 15,
292 ?
debugFmt("WARNING: out of bounds : "
293 "abs(~p-~p) < 15 => ~p~n",
294 [PerfectPercent
, Percent
, WithinBound
]),
297 ?
assert(WithinBound
),
303 ?
assertEqual(Sent
, Received
)
309 EmitterConfig
= {"127.0.0.1",12321},
310 EmitterType
= emitter
,
311 ListenerConfigs
= [ EmitterConfig
],
312 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
317 build_one (EventsToSend
, PerfectPercent
,
318 EmitterConfig
, EmitterType
, ListenerConfigs
)
322 mondemand_w_ttl_test_ () ->
323 Address
= {"127.0.0.1",12321, [{ttl
,25}]},
326 EmitterConfig
= {NumberToSendTo
, [Address
]},
327 EmitterType
= emitters
,
328 ListenerConfigs
= [ Address
],
329 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
334 build_one (EventsToSend
, PerfectPercent
,
335 EmitterConfig
, EmitterType
, ListenerConfigs
)
340 mondemand_legacy_test_ () ->
341 Address
= {"127.0.0.1",12321},
344 % mondemand used the queue form which I got rid of, now I'm converting
345 % those to group configs, so the following should work
346 EmitterConfig
= {NumberToSendTo
, [Address
]},
347 EmitterType
= emitters
,
348 ListenerConfigs
= [ Address
],
349 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
354 build_one (EventsToSend
, PerfectPercent
,
355 EmitterConfig
, EmitterType
, ListenerConfigs
)
359 multi_random_test_ () ->
361 ListenerConfigs
= [ {"127.0.0.1", 30000},
362 {"127.0.0.1", 30001},
365 EmitterConfig
= { NumberToSendTo
, random
, ListenerConfigs
},
366 EmitterType
= emitters
,
368 % calculate the perfect number of events per listener we should get
370 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
375 build_one (EventsToSend
, PerfectPercent
,
376 EmitterConfig
, EmitterType
, ListenerConfigs
)
380 grouped_random_test_ () ->
382 Group1Config
= [ { "127.0.0.1", 5301 },
383 { "127.0.0.1", 5302 }
385 Group2Config
= [ { "127.0.0.1", 5303 },
386 { "127.0.0.1", 5304 }
388 Group3Config
= [ { "127.0.0.1", 5305 },
389 { "127.0.0.1", 5306 }
391 ListenerConfigs
= Group1Config
++ Group2Config
++ Group3Config
,
392 EmitterConfig
= { NumberToSendTo
, group
,
394 { 1, random
, Group1Config
},
395 { 1, random
, Group2Config
},
396 { 1, random
, Group3Config
}
399 EmitterType
= emitters
,
401 PerfectPercent
= EventsToSend
/ length(ListenerConfigs
) * NumberToSendTo
,
406 build_one (EventsToSend
, PerfectPercent
,
407 EmitterConfig
, EmitterType
, ListenerConfigs
)