Merge pull request #10 from lwes/feature/fixes
[lwes-erlang/github-mirror.git] / src / lwes.erl
blobe35cd67cd68f92849cfb55babffcbbb6476191aa
1 %%%
2 %%% Light Weight Event System (LWES)
3 %%%
4 %%% Creating Events
5 %%% Event0 = lwes_event:new ("MyEvent"),
6 %%% Event1 = lwes_event:set_uint16 (Event0, "MyUint16", 25),
7 %%%
8 %%% Emitting to a single channel
9 %%%
10 %%% {ok, Channel0} = lwes:open (emitter, {Ip, Port})
11 %%% Channel1 = lwes:emit (Channel0, Event1).
12 %%%
13 %%% Emit to several channels
14 %%%
15 %%% % emit to 1 of a set in a round robin fashion
16 %%% {ok, Channels0} = lwes:open (emitters, {1, [{Ip1,Port1},...{IpN,PortN}]})
17 %%% Channels1 = lwes:emit (Channels0, Event1)
18 %%% Channels2 = lwes:emit (Channels1, Event2)
19 %%% ...
20 %%% lwes:close (ChannelsN)
21 %%%
22 %%% % emit to 2 of a set in an m of n fashion (ie, emit to first 2 in list,
23 %%% % then 2nd and 3rd, then 3rd and 4th, etc., wraps at end of list)
24 %%% {ok, Channels0} = lwes:open (emitters, {2, [{Ip1,Port1},...{IpN,PortN}]})
25 %%%
26 %%% Listening via callback
27 %%%
28 %%% {ok, Channel} = lwes:open (listener, {Ip, Port})
29 %%% lwes:listen (Channel, Fun, Type, Accum).
30 %%%
31 %%% Fun is called for each event
32 %%%
33 %%% Closing channel
34 %%%
35 %%% lwes:close (Channel)
37 -module (lwes).
39 -include_lib ("lwes.hrl").
40 -include ("lwes_internal.hrl").
42 %% API
43 -export ([ start/0,
44 open/2,
45 emit/2,
46 emit/3,
47 listen/4,
48 close/1,
49 stats/0,
50 stats_raw/0,
51 enable_validation/1 ]).
53 %%====================================================================
54 %% API functions
55 %%====================================================================
57 start () ->
58 application:start (lwes).
61 % open an lwes emitter, listener or set of emitters
63 % config for emitter/listener is
64 % { Ip, Port }
65 % config for emitters (aka, multi emitter) is, default strategy is queue
66 % for backward compatibility
67 % { NumberToSendToInThisGroup, [queue | random]
68 % [
69 % {Ip0,Port0},
70 % ...,
71 % {IpN,PortN}
72 % ]
73 % }
74 % config for groups is
75 % { NumberOfGroupsToSendTo,
76 % group,
77 % [
78 % { NumberToSendToInThisGroup,
79 % Type,
80 % [
81 % {Ip0,Port0},
82 % ...
83 % {IpN,PortN}
84 % ]
85 % },
86 % ...
87 % ]
88 % }
89 % an example group emission might be
90 % { 2,
91 % group,
92 % [
93 % { 1,
94 % random,
95 % [ {Ip0, Port0},
96 % ...
97 % {IpN, PortN}
98 % ]
99 % },
100 % { 1,
101 % random,
102 % [ {IpN+1, PortN+1},
103 % ...
104 % {IpN+M, PortN+M}
109 % which should send each event to one machine in each group
111 open (emitters, Config) ->
112 lwes_multi_emitter:open (Config);
113 open (Type, Config) when Type =:= emitter; Type =:= listener ->
114 try lwes_util:check_ip_port (Config) of
115 C -> lwes_channel:open (Type, C)
116 catch
117 _:_ -> { error, bad_ip_port }
118 end;
119 open (_, _) ->
120 { error, bad_type }.
122 % emit an array of events
123 emit (ChannelsIn, []) ->
124 ChannelsIn;
126 emit (ChannelsIn, [ HeadEvent = #lwes_event{} | TailEvents]) ->
127 ChannelsOut = emit (ChannelsIn, HeadEvent),
128 emit (ChannelsOut, TailEvents);
130 % emit an event to one or more channels
131 emit (Channel, Event) when is_record (Channel, lwes_channel) ->
132 lwes_channel:send_to (Channel, lwes_event:to_binary (Event)),
133 % channel doesn't actually change for a single emitter
134 Channel;
135 emit (Channels, Event) when is_record (Channels, lwes_multi_emitter) ->
136 lwes_multi_emitter:emit (Channels, lwes_event:to_binary (Event)).
138 % emit an event to one or more channels
139 emit (Channel, Event, SpecName) ->
140 case lwes_esf_validator:validate (SpecName, Event) of
141 true ->
142 emit (Channel, Event);
143 _ ->
144 error_logger:error_msg("validation failed for event '~s'",
145 [Event#lwes_event.name]),
146 Channel
147 end.
149 % listen for events
151 % Callback function - function is called with an event in given format
152 % and the current state, it should return the next
153 % state
155 % Type is one of
157 % raw - callback is given raw udp structure, use lwes_event:from_udp to
158 % turn into event
159 % list - callback is given an #lwes_event record where the name is a
160 % binary, and the attributes is a proplist where keys are binaries,
161 % and values are either integers (for lwes int types), binaries
162 % (for lwes strings), true|false atoms (for lwes booleans),
163 % or 4-tuples (for lwes ip addresses)
164 % tagged - callback is given an #lwes_event record where the name is a
165 % binary, and the attributes are 3-tuples with the first element
166 % the type of data, the second the key as a binary and the
167 % third the values as in the list format
168 % dict - callback is given an #lwes_event record where the name is a
169 % binary, and the attributes are a dictionary with a binary
170 % key and value according to the type
171 % json - this returns a proplist instead of an #lwes_event record. The
172 % valuse are mostly the same as list, but ip addresses are strings
173 % (as binary). This should means you can pass the returned value
174 % to mochijson2:encode (or other json encoders), and have the event
175 % as a json document
176 % json_eep18 - uses the eep18 format of mochijson2 decode
177 % json_proplist - uses the proplist format of mochijson2 decode
179 % Initial State is whatever you want
180 listen (Channel, CallbackFunction, EventType, CallbackInitialState)
181 when is_function (CallbackFunction, 2),
182 EventType =:= raw ; EventType =:= tagged ;
183 EventType =:= list ; EventType =:= dict ;
184 EventType =:= json ; EventType =:= json_proplist ;
185 EventType =:= json_eep18 ->
186 lwes_channel:register_callback (Channel, CallbackFunction,
187 EventType, CallbackInitialState).
189 % close the channel or channels
190 close (Channel) when is_record (Channel, lwes_channel) ->
191 lwes_channel:close (Channel);
192 close (Channels) when is_record (Channels, lwes_multi_emitter) ->
193 lwes_multi_emitter:close (Channels).
195 stats () ->
196 case application:get_application (lwes) of
197 undefined -> {error, {not_started, lwes}};
198 _ ->
199 io:format ("~-21s ~-20s ~-20s~n",["channel", "sent", "received"]),
200 io:format ("~-21s ~-20s ~-20s~n",["---------------------",
201 "--------------------",
202 "--------------------"]),
203 [ io:format ("~-21s ~-20b ~-20b~n",
204 [ io_lib:format ("~s:~-5b",[lwes_util:ip2bin (Ip), Port]),
205 Sent, Received])
206 || {Ip, Port, Sent, Received }
207 <- lwes_channel_manager:stats () ],
209 end.
211 stats_raw () ->
212 case application:get_application (lwes) of
213 undefined -> {error, {not_started, lwes}};
214 _ ->
215 [ {lists:flatten (
216 io_lib:format ("~s:~-5b",[lwes_util:ip2bin (Ip), Port])),
217 Sent, Received }
218 || {Ip, Port, Sent, Received }
219 <- lwes_channel_manager:stats () ]
220 end.
223 % enable validation of the events sent via this LWES client
224 % against the specification (ESF)
226 % ESFInfo - a list of tuples of the form { Name, FilePath }
228 % Example : [{Name1, path1}, {Name2, path2}]
230 % 'Name' is used to match the 'event' with a particular
231 % ESF File
233 % 'FilePath' is the path to the ESF File
234 enable_validation (ESFInfo) ->
235 lists:foreach (
236 fun ({ESF, File}) -> lwes_esf_validator:add_esf (ESF, File) end,
237 ESFInfo).
239 %%====================================================================
240 %% Internal functions
241 %%====================================================================
243 %%====================================================================
244 %% Test functions
245 %%====================================================================
246 -ifdef (TEST).
247 -include_lib ("eunit/include/eunit.hrl").
249 -endif.