restructure to be a littel more flexible
[lwes-erlang/github-mirror.git] / src / lwes_emitter_udp_pool.erl
blobe540b49d42e3ce9c64b11f914fe4f86735504a48
1 -module (lwes_emitter_udp_pool).
3 -behaviour (gen_server).
5 %% API
6 -export ([start_link/1,
7 send/3
8 ]).
10 -export ([init/1,
11 handle_call/3,
12 handle_cast/2,
13 handle_info/2,
14 terminate/2,
15 code_change/3]).
17 -record (state, {id, queue = queue:new() }).
19 % there are 2 type of records stored in the ets table, we'll use the
20 % first element as the keypos, so either the port for a socket, or
21 % the id for config
22 -record (connection, {socket,
23 create_time_epoch_seconds,
24 access_time_epoch_seconds,
25 max_age_seconds}).
26 -define (CONNECTION_CREATE_TIME_INDEX, #connection.create_time_epoch_seconds).
27 -define (CONNECTION_LAST_ACCESS_INDEX, #connection.access_time_epoch_seconds).
28 -define (CONNECTION_MAX_AGE_INDEX, #connection.max_age_seconds).
30 -record (config, {id,
31 max_age_seconds = 60,
32 max_connections = 65535, % set unreasonably high?
33 active = 0,
34 busy = 0
35 }).
36 -define (CONFIG_MAX_INDEX, #config.max_connections).
37 -define (CONFIG_ACTIVE_INDEX, #config.active).
38 -define (CONFIG_BUSY_INDEX, #config.busy).
39 -define (CONFIG_MAX_AGE_INDEX, #config.max_age_seconds).
41 -define (TABLE, lwes_emitters).
43 %%====================================================================
44 %% API
45 %%====================================================================
46 start_link (Config = #config {id = Id}) ->
47 gen_server:start_link({local, Id}, ?MODULE, [Config], []);
48 start_link (ConfigList) when is_list (ConfigList) ->
49 case parse_config (ConfigList, #config {}) of
50 {error, E} -> {stop, {error, E}};
51 {ok, Config}-> start_link (Config)
52 end.
54 send (Id, Address, Packet) ->
55 case checkout (Id) of
56 {error, Error} ->
57 {error, {checkout, Error}}; % Error is {error, busy} or {error, down}
58 Socket ->
59 case lwes_net_udp:send (Socket, Address, Packet) of
60 {error, Error} ->
61 checkin (Id, Socket, error),
62 {error, {call, Error}};
63 Answer ->
64 checkin (Id, Socket),
65 Answer
66 end
67 end.
69 out (Id) -> gen_server:call (Id, {out}).
70 in (Id, Socket) -> gen_server:cast (Id, {in, Socket}).
72 checkout (Id) ->
73 case out (Id) of
74 empty -> open (Id);
75 {value, Socket} -> Socket
76 end.
78 checkin (Id, Socket, error) ->
79 close (Id, Socket).
81 checkin (Id, Socket) ->
82 % at checkin, we'll do all our housekeeping
84 % get the time now
85 Now = seconds_since_epoch(),
87 % fetch out the start time and set the last access
88 [Start, MaxAge] =
89 ets:update_counter (Id, Socket,
90 [ {?CONNECTION_CREATE_TIME_INDEX, 0},
91 {?CONNECTION_MAX_AGE_INDEX, 0} ] ),
93 % check to see if we've been alive for more than the alloted alive time
94 case Now - Start > MaxAge of
95 true ->
96 % alive too long, so close
97 close (Id, Socket);
98 false ->
99 ets:update_element (Id, Socket,
100 {?CONNECTION_LAST_ACCESS_INDEX, Now}),
101 in (Id, Socket)
102 end.
104 open (Id) ->
105 % optimistically update the active connections, so we don't go above max
106 [Max, Active] =
107 ets:update_counter (Id, Id,
108 [ {?CONFIG_MAX_INDEX,0},
109 {?CONFIG_ACTIVE_INDEX,1}
112 % then check if the active connections are more than the max
113 case Active > Max of
114 true ->
115 % we decrement active connections because we optimistically incremented
116 % above, also increment busy index.
117 ets:update_counter (Id, Id, [{?CONFIG_BUSY_INDEX,1},
118 {?CONFIG_ACTIVE_INDEX,-1}]),
119 {error, busy};
120 false ->
121 case ets:lookup (Id, Id) of
122 [#config { max_age_seconds = MaxAge } ] ->
123 % address doesn't matter for emitter's it's added at send time
124 Dummy = lwes_net_udp:new (emitter, {"127.0.0.1",9191}),
125 case lwes_net_udp:open (emitter, Dummy) of
126 {ok, Socket} ->
127 % Let this gen_server be the controlling process, not the
128 % process opening the connection, this allows the process
129 % opening the connection to die without killing the connection
130 ok = gen_udp:controlling_process (Socket, whereis (Id)),
132 Now = seconds_since_epoch(),
133 ets:insert_new (Id,
134 #connection { socket = Socket,
135 create_time_epoch_seconds = Now,
136 access_time_epoch_seconds = Now,
137 max_age_seconds = MaxAge
139 Socket;
140 {error, E} ->
141 % we decrement active connections because we optimistically
142 % incremented above, also increment busy index.
143 ets:update_counter (Id, Id, [{?CONFIG_BUSY_INDEX,1},
144 {?CONFIG_ACTIVE_INDEX,-1}]),
146 error_logger:error_msg ("Unexpected connect error 1 : ~p", [E]),
147 {error, down}
148 end;
149 E2 ->
150 % we decrement active connections because we optimistically
151 % incremented above, also increment busy index.
152 ets:update_counter (Id, Id, [{?CONFIG_BUSY_INDEX,1},
153 {?CONFIG_ACTIVE_INDEX,-1}]),
155 error_logger:error_msg ("Unexpected connect error 2 : ~p", [E2]),
156 {error, down}
158 end.
160 close (Id, Socket) ->
161 ets:update_counter (Id, Id, [{?CONFIG_ACTIVE_INDEX,-1}]),
162 ets:delete (Id, Socket),
163 lwes_net_udp:close (Socket).
165 %%====================================================================
166 %% gen_server callbacks
167 %%====================================================================
168 init([Config = #config { id = Id }]) ->
169 ets:new (Id,
170 [ named_table,
171 public,
172 set,
173 {keypos, 2},
174 {write_concurrency, true}
176 ets:insert_new (Id, Config),
177 {ok, #state { id = Id }}.
179 handle_call ({size}, _From, State = #state { queue = QueueIn }) ->
180 {reply, queue:len (QueueIn), State};
181 handle_call ({out}, _From, State = #state { queue = QueueIn }) ->
182 {Value, QueueOut} = queue:out (QueueIn),
183 {reply, Value, State#state { queue = QueueOut }};
184 handle_call (Request, From, State) ->
185 io:format ("~p:handle_call ~p from ~p~n",[?MODULE, Request, From]),
186 {reply, ok, State}.
188 handle_cast ({in, Socket}, State = #state { queue = QueueIn }) ->
189 {noreply, State#state { queue = queue:in (Socket, QueueIn) }};
190 handle_cast (Request, State) ->
191 io:format ("~p:handle_cast ~p~n",[?MODULE, Request]),
192 {noreply, State}.
194 handle_info (Info, State) ->
195 io:format ("~p:handle_info ~p~n",[?MODULE,Info]),
196 {noreply, State}.
198 terminate (_Reason, _State) ->
201 code_change (_OldVsn, State, _Extra) ->
202 {ok, State}.
204 %%--------------------------------------------------------------------
205 %%% Helper functions
206 %%--------------------------------------------------------------------
207 parse_config ([], Config) ->
208 validate_config (Config);
209 parse_config ([{id, Id}|Rest], Config = #config{}) ->
210 parse_config (Rest, Config#config {id = Id});
211 parse_config ([{max_age_seconds, MaxAge}|Rest], Config = #config{}) ->
212 parse_config (Rest, Config#config {max_age_seconds = MaxAge});
213 parse_config ([{max_connections, MaxConnections}|Rest], Config = #config{}) ->
214 parse_config (Rest, Config#config {max_connections = MaxConnections });
215 parse_config ([_|Rest], Config = #config{}) ->
216 parse_config (Rest, Config).
218 validate_config ( Config = #config { id = Id }) ->
219 % double check non-defaulted options have values
220 case Id =/= undefined of
221 true -> {ok, Config};
222 false -> {error, bad_config}
223 end.
225 seconds_since_epoch () ->
226 {Mega, Secs, _ } = os:timestamp(),
227 Mega * 1000000 + Secs.
229 %%--------------------------------------------------------------------
230 %%% Test functions
231 %%--------------------------------------------------------------------
232 -ifdef (TEST).
233 -include_lib ("eunit/include/eunit.hrl").
236 -endif.