various fixes, see ChangeLog
[lwes-erlang/github-mirror.git] / src / lwes_channel.erl
blobed98d5eee96f6056938dce9c83f38f2cc28e68fc
1 -module (lwes_channel).
3 -behaviour (gen_server).
5 -include_lib ("lwes.hrl").
6 -include ("lwes_internal.hrl").
8 %% API
9 -export ([ start_link/1,
10 open/2,
11 register_callback/4,
12 send_to/2,
13 close/1,
14 stats/1
15 ]).
17 %% gen_server callbacks
18 -export ([ init/1,
19 handle_call/3,
20 handle_cast/2,
21 handle_info/2,
22 terminate/2,
23 code_change/3
24 ]).
26 -record (state, {socket, channel, type, callback, sent = 0, received = 0}).
27 -record (callback, {function, format, state}).
29 %%====================================================================
30 %% API functions
31 %%====================================================================
32 start_link (Channel) ->
33 gen_server:start_link (?MODULE, [Channel], []).
35 open (Type, {Ip, Port, TTL, Recbuf}) ->
36 Channel = #lwes_channel {
37 ip = Ip,
38 port = Port,
39 is_multicast = is_multicast (Ip),
40 ttl = TTL,
41 recbuf = Recbuf,
42 type = Type,
43 ref = make_ref ()
45 { ok, _Pid } = lwes_channel_manager:open_channel (Channel),
46 { ok, Channel};
47 open (Type, {Ip, Port, TTL}) ->
48 Channel = #lwes_channel {
49 ip = Ip,
50 port = Port,
51 is_multicast = is_multicast (Ip),
52 ttl = TTL,
53 type = Type,
54 ref = make_ref ()
56 { ok, _Pid } = lwes_channel_manager:open_channel (Channel),
57 { ok, Channel};
58 open (Type, {Ip, Port}) ->
59 Channel = #lwes_channel {
60 ip = Ip,
61 port = Port,
62 is_multicast = is_multicast (Ip),
63 type = Type,
64 ref = make_ref ()
66 { ok, _Pid } = lwes_channel_manager:open_channel (Channel),
67 { ok, Channel}.
69 register_callback (Channel, CallbackFunction, EventType, CallbackState) ->
70 find_and_call ( Channel,
71 { register, CallbackFunction, EventType, CallbackState }).
73 send_to (Channel, Msg) ->
74 find_and_call (Channel, { send, Msg }).
76 close (Channel) ->
77 find_and_cast (Channel, stop).
79 stats (Channel) ->
80 find_and_call (Channel, stats).
82 %%====================================================================
83 %% gen_server callbacks
84 %%====================================================================
85 init ([ Channel = #lwes_channel {
86 ip = Ip,
87 port = Port,
88 is_multicast = IsMulticast,
89 ttl = TTL,
90 recbuf = Recbuf,
91 type = Type
93 ]) ->
94 { ok, Socket }=
95 case {Type, IsMulticast} of
96 {listener, true} ->
97 gen_udp:open ( Port,
98 [ { reuseaddr, true },
99 { ip, Ip },
100 { multicast_ttl, TTL },
101 { multicast_loop, false },
102 { add_membership, {Ip, {0,0,0,0}}},
103 { recbuf, Recbuf },
104 binary
105 | reuseport()
107 {listener, false} ->
108 gen_udp:open ( Port,
109 [ { recbuf, Recbuf },
110 { reuseaddr, true },
111 binary
112 | reuseport()
114 {_, _} ->
115 case IsMulticast of
116 true ->
117 gen_udp:open ( 0,
118 [ { recbuf, Recbuf },
119 { multicast_ttl, TTL },
120 binary
122 false ->
123 gen_udp:open ( 0,
124 [ { recbuf, Recbuf },
125 binary
128 end,
129 lwes_channel_manager:register_channel (Channel, self()),
130 { ok, #state { socket = Socket,
131 channel = Channel,
132 type = Type
136 handle_call ({ register, Function, Format, Accum },
137 _From,
138 State = #state {
139 channel = #lwes_channel {type = listener }
140 }) ->
141 { reply,
143 State#state { callback = #callback { function = Function,
144 format = Format,
145 state = Accum } } };
147 handle_call ({ send, Packet },
148 _From,
149 State = #state {
150 socket = Socket,
151 channel = #lwes_channel { ip = Ip, port = Port },
152 sent = Sent
153 }) ->
154 { reply,
155 gen_udp:send (Socket, Ip, Port, Packet),
156 State#state { sent = Sent + 1 }
158 handle_call (stats,
159 _From,
160 State = #state {
161 sent = Sent,
162 received = Received
163 }) ->
164 { reply, {Sent, Received}, State };
166 handle_call (Request, From, State) ->
167 error_logger:warning_msg ("unrecognized call ~p from ~p~n",[Request, From]),
168 { reply, ok, State }.
170 handle_cast (stop, State) ->
171 {stop, normal, State};
172 handle_cast (Request, State) ->
173 error_logger:warning_msg ("unrecognized cast ~p~n",[Request]),
174 { noreply, State }.
176 % skip if we don't have a handler
177 handle_info ( {udp, _, _, _, _},
178 State = #state {
179 type = listener,
180 callback = undefined,
181 received = Received
182 } ) ->
183 { noreply, State#state { received = Received + 1 } };
185 handle_info ( Packet = {udp, _, _, _, _},
186 State = #state {
187 type = listener,
188 callback = #callback { function = Function,
189 format = Format,
190 state = CbState },
191 received = Received
192 } ) ->
193 Event =
194 case Format of
195 raw -> Packet;
196 _ -> lwes_event:from_udp_packet (Packet, Format)
197 end,
198 NewCbState = Function (Event, CbState),
199 { noreply,
200 State#state { callback = #callback { function = Function,
201 format = Format,
202 state = NewCbState },
203 received = Received + 1
207 handle_info ( Request, State) ->
208 error_logger:warning_msg ("unrecognized info ~p~n",[Request]),
209 {noreply, State}.
211 terminate (_Reason, #state {socket = Socket, channel = Channel}) ->
212 gen_udp:close (Socket),
213 lwes_channel_manager:unregister_channel (Channel).
215 code_change (_OldVsn, State, _Extra) ->
216 {ok, State}.
218 %%====================================================================
219 %% Internal functions
220 %%====================================================================
221 is_multicast ({N1, _, _, _}) when N1 >= 224, N1 =< 239 ->
222 true;
223 is_multicast (_) ->
224 false.
226 reuseport() ->
227 case os:type() of
228 {unix, linux} ->
229 [ {raw, 1, 15, <<1:32/native>>} ];
230 {unix, OS} when OS =:= darwin;
231 OS =:= freebsd;
232 OS =:= openbsd;
233 OS =:= netbsd ->
234 [ {raw, 16#ffff, 16#0200, <<1:32/native>>} ];
235 _ -> []
236 end.
238 find_and_call (Channel, Msg) ->
239 case lwes_channel_manager:find_channel (Channel) of
240 {error, not_open} ->
241 {error, not_open};
242 Pid ->
243 gen_server:call ( Pid, Msg )
244 end.
246 find_and_cast (Channel, Msg) ->
247 case lwes_channel_manager:find_channel (Channel) of
248 {error, not_open} ->
249 {error, not_open};
250 Pid ->
251 gen_server:cast ( Pid, Msg )
252 end.
254 %%====================================================================
255 %% Test functions
256 %%====================================================================
257 -ifdef (TEST).
258 -include_lib ("eunit/include/eunit.hrl").
260 -endif.