1 -module (lwes_channel
).
3 -behaviour (gen_server
).
5 -include_lib ("lwes.hrl").
6 -include ("lwes_internal.hrl").
9 -export ([ start_link
/1,
17 %% gen_server callbacks
26 -record (state
, {socket
, channel
, type
, callback
}).
27 -record (callback
, {function, format
, state
}).
29 %%====================================================================
31 %%====================================================================
32 start_link (Channel
) ->
33 gen_server:start_link (?MODULE
, [Channel
], []).
38 config
= lwes_net_udp:new (Type
, Config
),
43 { ok
, _Pid
} = lwes_channel_manager:open_channel (Channel
),
46 register_callback (Channel
, CallbackFunction
, EventType
, CallbackState
) ->
47 find_and_call ( Channel
,
48 { register, CallbackFunction
, EventType
, CallbackState
}).
50 send_to (Channel
, Msg
) ->
51 find_and_call (Channel
, { send
, Msg
}).
54 find_and_cast (Channel
, stop
).
56 %%====================================================================
57 %% gen_server callbacks
58 %%====================================================================
59 init ([ Channel
= #lwes_channel
{ type
= Type
, config
= Config
} ]) ->
60 { ok
, Socket
} = lwes_net_udp:open (Type
, Config
),
61 lwes_stats:initialize (lwes_net_udp:address(Config
)),
62 lwes_channel_manager:register_channel (Channel
, self()),
63 { ok
, #state
{ socket
= Socket
,
69 handle_call ({ register, Function
, Format
, Accum
},
72 channel
= #lwes_channel
{type
= listener
}
76 State#state
{ callback
= #callback
{ function = Function
,
80 handle_call ({ send
, Packet
},
84 channel
= #lwes_channel
{ config
= Config
}
87 case lwes_net_udp:send (Socket
, Config
, Packet
) of
89 lwes_stats:increment_sent(lwes_net_udp:address(Config
)),
92 lwes_stats:increment_errors(lwes_net_udp:address(Config
)),
95 { reply
, Reply
, State
};
96 handle_call (_Request
, _From
, State
) ->
99 handle_cast (stop
, State
) ->
100 {stop
, normal
, State
};
101 handle_cast (_Request
, State
) ->
104 % skip if we don't have a handler
105 handle_info ( {udp
, _
, _
, _
, _
},
108 channel
= #lwes_channel
{ config
= Config
},
111 lwes_stats:increment_received (lwes_net_udp:address(Config
)),
114 handle_info ( Packet
= {udp
, _
, _
, _
, _
},
117 channel
= #lwes_channel
{ config
= Config
},
118 callback
= #callback
{ function = Function
,
122 lwes_stats:increment_received (lwes_net_udp:address(Config
)),
123 Event
= lwes_event:from_udp_packet (Packet
, Format
),
124 NewCbState
= Function (Event
, CbState
),
126 State#state
{ callback
= #callback
{ function = Function
,
132 handle_info (_Request
, State
) ->
135 terminate (_Reason
, #state
{socket
= Socket
, channel
= Channel
}) ->
136 lwes_net_udp:close (Socket
),
137 lwes_channel_manager:unregister_channel (Channel
).
139 code_change (_OldVsn
, State
, _Extra
) ->
142 %%====================================================================
143 %% Internal functions
144 %%====================================================================
145 find_and_call (Channel
, Msg
) ->
146 case lwes_channel_manager:find_channel (Channel
) of
150 gen_server:call ( Pid
, Msg
)
153 find_and_cast (Channel
, Msg
) ->
154 case lwes_channel_manager:find_channel (Channel
) of
158 gen_server:cast ( Pid
, Msg
)
161 %%====================================================================
163 %%====================================================================
165 -include_lib ("eunit/include/eunit.hrl").