2 % This module implements an M of N emitter. Given N different channels,
3 % when emit is called, the event is emitted over M of them. You can use
4 % this in situations where you want to mimic multicast by emitting the
5 % same event to multiple places, in addition to load balancing the emission
6 % across a set of machines
8 -module (lwes_multi_emitter
).
10 -include ("lwes_internal.hrl").
18 %%====================================================================
20 %%====================================================================
23 open ({M
, group
, N
}) ->
25 _
when is_integer (M
), M
>= 1, M
=:= length (N
) ->
27 #lwes_multi_emitter
{ type
= group
, m
= M
,
28 n
= [ begin {ok
, Cout
} = open (C
), Cout
end || C
<- N
] }};
30 { error
, bad_m_value
}
32 open ({M
, Type
, N
}) ->
34 _
when is_integer (M
), M
>= 1 ->
35 case open_emitters (N
) of
42 MyN
= queue:from_list (Combos
),
43 { ok
, #lwes_multi_emitter
{ type
= queue
, m
= M
, n
= MyN
} };
45 MyN
= list_to_tuple (Combos
),
46 {ok
, #lwes_multi_emitter
{ type
= random
, m
= M
, n
= MyN
}}
50 { error
, bad_m_value
}
53 emit (Emitters
= #lwes_multi_emitter
{ type
= group
, n
= NIn
}, Bin
) ->
54 Emitters#lwes_multi_emitter
{ n
= [ emit (N
, Bin
) || N
<- NIn
] };
55 emit (Emitters
= #lwes_multi_emitter
{ type
= random
, n
= NIn
}, Bin
) ->
56 % get list to emit to as a random entry from list
57 Index
= crypto:rand_uniform (1, tuple_size (NIn
) + 1),
58 ToEmitTo
= element (Index
, NIn
),
61 lists:foreach (fun (E
) ->
62 lwes_channel:send_to (E
, Bin
)
66 emit (Emitters
= #lwes_multi_emitter
{ type
= queue
, n
= NIn
}, Bin
) ->
67 % get list to emit to from queue
68 {{value
, ToEmitTo
},NTmp
} = queue:out (NIn
),
71 lists:foreach (fun (E
) ->
72 lwes_channel:send_to (E
, Bin
)
76 NOut
= queue:in (ToEmitTo
, NTmp
),
78 Emitters#lwes_multi_emitter
{ n
= NOut
}.
80 close (#lwes_multi_emitter
{ type
= group
, n
= N
}) ->
81 [ close (E
) || E
<- N
],
83 close (#lwes_multi_emitter
{ type
= random
, n
= N
}) ->
84 close_emitters (lists:usort(lists:flatten(tuple_to_list (N
)))),
86 close (#lwes_multi_emitter
{ type
= queue
, n
= N
}) ->
87 close_emitters (lists:usort(lists:flatten(queue:to_list(N
)))),
90 %%====================================================================
92 %%====================================================================
93 open_emitters (EmittersConfig
) ->
94 open_emitters (EmittersConfig
, []).
96 open_emitters ([], E
) ->
98 open_emitters ([Config
| R
], E
) ->
99 case lwes:open (emitter
, Config
) of
104 open_emitters (R
, [Emitter
| E
])
107 close_emitters (Emitters
) ->
108 lists:map (fun (E
) -> lwes:close (E
) end, Emitters
).
110 % get next m values from queue, then put back m-1 values
112 % get first M elements from Q
113 {FQ
, RQ
} = queue:split (M
, Q
),
115 % save as the list we will return
116 O
= queue:to_list (FQ
),
118 % join the queue back together
119 TQ
= queue:join (FQ
, RQ
),
121 % take the first value from the queue
122 {{value
, H
},OQ
} = queue:out(TQ
),
124 % put in back at the end
125 NQ
= queue:in (H
, OQ
),
128 % find all m sets of values in a list of values
130 Q
= queue:from_list (N
),
132 lists:foldl (fun (_
, {QI
,A
}) ->
133 {AO
, QO
} = nextm (M
, QI
),
140 %%====================================================================
142 %%====================================================================
144 -include_lib ("eunit/include/eunit.hrl").