various fixes, see ChangeLog
[lwes-erlang/github-mirror.git] / src / lwes_multi_emitter.erl
blob35d877e109ac26d550f80725d3cf8b2e83a20a3a
2 % This module implements an M of N emitter. Given N different channels,
3 % when emit is called, the event is emitted over M of them. You can use
4 % this in situations where you want to mimic multicast by emitting the
5 % same event to multiple places, in addition to load balancing the emission
6 % across a set of machines
8 -module (lwes_multi_emitter).
10 -include ("lwes_internal.hrl").
12 %% API
13 -export ([ open/1,
14 emit/2,
15 close/1
16 ]).
18 %%====================================================================
19 %% API
20 %%====================================================================
21 open ({M, N}) ->
22 open ({M, queue, N});
23 open ({M, group, N}) ->
24 case M of
25 _ when is_integer (M), M >= 1, M =:= length (N) ->
26 {ok,
27 #lwes_multi_emitter { type = group, m = M,
28 n = [ begin {ok, Cout} = open (C), Cout end || C <- N] }};
29 _ ->
30 { error, bad_m_value }
31 end;
32 open ({M, Type, N}) ->
33 case M of
34 _ when is_integer (M), M >= 1 ->
35 case open_emitters (N) of
36 {error, Error} ->
37 {error, Error};
38 E ->
39 Combos = allm (M, E),
40 case Type of
41 queue ->
42 MyN = queue:from_list (Combos),
43 { ok, #lwes_multi_emitter { type = queue, m = M, n = MyN } };
44 random ->
45 MyN = list_to_tuple (Combos),
46 {ok, #lwes_multi_emitter { type = random, m = M, n = MyN }}
47 end
48 end;
49 _ ->
50 { error, bad_m_value }
51 end.
53 emit (Emitters = #lwes_multi_emitter { type = group, n = NIn }, Bin) ->
54 Emitters#lwes_multi_emitter { n = [ emit (N, Bin) || N <- NIn ] };
55 emit (Emitters = #lwes_multi_emitter { type = random, n = NIn }, Bin) ->
56 % get list to emit to as a random entry from list
57 Index = crypto:rand_uniform (1, tuple_size (NIn) + 1),
58 ToEmitTo = element (Index, NIn),
60 % emit event to each
61 lists:foreach (fun (E) ->
62 lwes_channel:send_to (E, Bin)
63 end, ToEmitTo),
64 Emitters;
66 emit (Emitters = #lwes_multi_emitter { type = queue, n = NIn }, Bin) ->
67 % get list to emit to from queue
68 {{value, ToEmitTo},NTmp} = queue:out (NIn),
70 % emit event to each
71 lists:foreach (fun (E) ->
72 lwes_channel:send_to (E, Bin)
73 end, ToEmitTo),
75 % put back into queue
76 NOut = queue:in (ToEmitTo, NTmp),
78 Emitters#lwes_multi_emitter { n = NOut }.
80 close (#lwes_multi_emitter { type = group, n = N }) ->
81 [ close (E) || E <- N ],
82 ok;
83 close (#lwes_multi_emitter { type = random, n = N }) ->
84 close_emitters (lists:usort(lists:flatten(tuple_to_list (N)))),
85 ok;
86 close (#lwes_multi_emitter { type = queue, n = N }) ->
87 close_emitters (lists:usort(lists:flatten(queue:to_list(N)))),
88 ok.
90 %%====================================================================
91 %% Internal functions
92 %%====================================================================
93 open_emitters (EmittersConfig) ->
94 open_emitters (EmittersConfig, []).
96 open_emitters ([], E) ->
98 open_emitters ([Config | R], E) ->
99 case lwes:open (emitter, Config) of
100 {error, Error} ->
101 close_emitters (E),
102 {error, Error};
103 {ok, Emitter} ->
104 open_emitters (R, [Emitter | E])
105 end.
107 close_emitters (Emitters) ->
108 lists:map (fun (E) -> lwes:close (E) end, Emitters).
110 % get next m values from queue, then put back m-1 values
111 nextm (M, Q) ->
112 % get first M elements from Q
113 {FQ, RQ} = queue:split (M, Q),
115 % save as the list we will return
116 O = queue:to_list (FQ),
118 % join the queue back together
119 TQ = queue:join (FQ, RQ),
121 % take the first value from the queue
122 {{value, H},OQ} = queue:out(TQ),
124 % put in back at the end
125 NQ = queue:in (H, OQ),
126 {O, NQ}.
128 % find all m sets of values in a list of values
129 allm (M, N) ->
130 Q = queue:from_list (N),
131 {_,Out} =
132 lists:foldl (fun (_, {QI,A}) ->
133 {AO, QO} = nextm (M, QI),
134 {QO, [AO|A]}
135 end,
136 {Q, []},
138 lists:reverse (Out).
140 %%====================================================================
141 %% Test functions
142 %%====================================================================
143 -ifdef (TEST).
144 -include_lib ("eunit/include/eunit.hrl").
146 -endif.