1 -module (lwes_emitter_udp_pool
).
3 -behaviour (gen_server
).
6 -export ([start_link
/1,
17 -record (state
, {id
, queue
= queue:new() }).
19 % there are 2 type of records stored in the ets table, we'll use the
20 % first element as the keypos, so either the port for a socket, or
22 -record (connection
, {socket
,
23 create_time_epoch_seconds
,
24 access_time_epoch_seconds
,
26 -define (CONNECTION_CREATE_TIME_INDEX
, #connection
.create_time_epoch_seconds
).
27 -define (CONNECTION_LAST_ACCESS_INDEX
, #connection
.access_time_epoch_seconds
).
28 -define (CONNECTION_MAX_AGE_INDEX
, #connection
.max_age_seconds
).
32 max_connections
= 65535, % set unreasonably high?
36 -define (CONFIG_MAX_INDEX
, #config
.max_connections
).
37 -define (CONFIG_ACTIVE_INDEX
, #config
.active
).
38 -define (CONFIG_BUSY_INDEX
, #config
.busy
).
39 -define (CONFIG_MAX_AGE_INDEX
, #config
.max_age_seconds
).
41 -define (TABLE
, lwes_emitters
).
43 %%====================================================================
45 %%====================================================================
46 start_link (Config
= #config
{id
= Id
}) ->
47 gen_server:start_link({local
, Id
}, ?MODULE
, [Config
], []);
48 start_link (ConfigList
) when is_list (ConfigList
) ->
49 case parse_config (ConfigList
, #config
{}) of
50 {error
, E
} -> {stop
, {error
, E
}};
51 {ok
, Config
}-> start_link (Config
)
54 send (Id
, Address
, Packet
) ->
57 {error
, {checkout
, Error
}}; % Error is {error, busy} or {error, down}
59 case lwes_net_udp:send (Socket
, Address
, Packet
) of
61 checkin (Id
, Socket
, error
),
62 {error
, {call
, Error
}};
69 out (Id
) -> gen_server:call (Id
, {out
}).
70 in (Id
, Socket
) -> gen_server:cast (Id
, {in
, Socket
}).
75 {value
, Socket
} -> Socket
78 checkin (Id
, Socket
, error
) ->
81 checkin (Id
, Socket
) ->
82 % at checkin, we'll do all our housekeeping
85 Now
= seconds_since_epoch(),
87 % fetch out the start time and set the last access
89 ets:update_counter (Id
, Socket
,
90 [ {?CONNECTION_CREATE_TIME_INDEX
, 0},
91 {?CONNECTION_MAX_AGE_INDEX
, 0} ] ),
93 % check to see if we've been alive for more than the alloted alive time
94 case Now
- Start
> MaxAge
of
96 % alive too long, so close
99 ets:update_element (Id
, Socket
,
100 {?CONNECTION_LAST_ACCESS_INDEX
, Now
}),
105 % optimistically update the active connections, so we don't go above max
107 ets:update_counter (Id
, Id
,
108 [ {?CONFIG_MAX_INDEX
,0},
109 {?CONFIG_ACTIVE_INDEX
,1}
112 % then check if the active connections are more than the max
115 % we decrement active connections because we optimistically incremented
116 % above, also increment busy index.
117 ets:update_counter (Id
, Id
, [{?CONFIG_BUSY_INDEX
,1},
118 {?CONFIG_ACTIVE_INDEX
,-1}]),
121 case ets:lookup (Id
, Id
) of
122 [#config
{ max_age_seconds
= MaxAge
} ] ->
123 % address doesn't matter for emitter's it's added at send time
124 Dummy
= lwes_net_udp:new (emitter
, {"127.0.0.1",9191}),
125 case lwes_net_udp:open (emitter
, Dummy
) of
127 % Let this gen_server be the controlling process, not the
128 % process opening the connection, this allows the process
129 % opening the connection to die without killing the connection
130 ok
= gen_udp:controlling_process (Socket
, whereis (Id
)),
132 Now
= seconds_since_epoch(),
134 #connection
{ socket
= Socket
,
135 create_time_epoch_seconds
= Now
,
136 access_time_epoch_seconds
= Now
,
137 max_age_seconds
= MaxAge
141 % we decrement active connections because we optimistically
142 % incremented above, also increment busy index.
143 ets:update_counter (Id
, Id
, [{?CONFIG_BUSY_INDEX
,1},
144 {?CONFIG_ACTIVE_INDEX
,-1}]),
146 error_logger:error_msg ("Unexpected connect error 1 : ~p", [E
]),
150 % we decrement active connections because we optimistically
151 % incremented above, also increment busy index.
152 ets:update_counter (Id
, Id
, [{?CONFIG_BUSY_INDEX
,1},
153 {?CONFIG_ACTIVE_INDEX
,-1}]),
155 error_logger:error_msg ("Unexpected connect error 2 : ~p", [E2
]),
160 close (Id
, Socket
) ->
161 ets:update_counter (Id
, Id
, [{?CONFIG_ACTIVE_INDEX
,-1}]),
162 ets:delete (Id
, Socket
),
163 lwes_net_udp:close (Socket
).
165 %%====================================================================
166 %% gen_server callbacks
167 %%====================================================================
168 init([Config
= #config
{ id
= Id
}]) ->
174 {write_concurrency
, true
}
176 ets:insert_new (Id
, Config
),
177 {ok
, #state
{ id
= Id
}}.
179 handle_call ({size}, _From
, State
= #state
{ queue
= QueueIn
}) ->
180 {reply
, queue:len (QueueIn
), State
};
181 handle_call ({out
}, _From
, State
= #state
{ queue
= QueueIn
}) ->
182 {Value
, QueueOut
} = queue:out (QueueIn
),
183 {reply
, Value
, State#state
{ queue
= QueueOut
}};
184 handle_call (Request
, From
, State
) ->
185 io:format ("~p:handle_call ~p from ~p~n",[?MODULE
, Request
, From
]),
188 handle_cast ({in
, Socket
}, State
= #state
{ queue
= QueueIn
}) ->
189 {noreply
, State#state
{ queue
= queue:in (Socket
, QueueIn
) }};
190 handle_cast (Request
, State
) ->
191 io:format ("~p:handle_cast ~p~n",[?MODULE
, Request
]),
194 handle_info (Info
, State
) ->
195 io:format ("~p:handle_info ~p~n",[?MODULE
,Info
]),
198 terminate (_Reason
, _State
) ->
201 code_change (_OldVsn
, State
, _Extra
) ->
204 %%--------------------------------------------------------------------
206 %%--------------------------------------------------------------------
207 parse_config ([], Config
) ->
208 validate_config (Config
);
209 parse_config ([{id
, Id
}|Rest
], Config
= #config
{}) ->
210 parse_config (Rest
, Config#config
{id
= Id
});
211 parse_config ([{max_age_seconds
, MaxAge
}|Rest
], Config
= #config
{}) ->
212 parse_config (Rest
, Config#config
{max_age_seconds
= MaxAge
});
213 parse_config ([{max_connections
, MaxConnections
}|Rest
], Config
= #config
{}) ->
214 parse_config (Rest
, Config#config
{max_connections
= MaxConnections
});
215 parse_config ([_
|Rest
], Config
= #config
{}) ->
216 parse_config (Rest
, Config
).
218 validate_config ( Config
= #config
{ id
= Id
}) ->
219 % double check non-defaulted options have values
220 case Id
=/= undefined
of
221 true
-> {ok
, Config
};
222 false
-> {error
, bad_config
}
225 seconds_since_epoch () ->
226 {Mega
, Secs
, _
} = os:timestamp(),
227 Mega
* 1000000 + Secs
.
229 %%--------------------------------------------------------------------
231 %%--------------------------------------------------------------------
233 -include_lib ("eunit/include/eunit.hrl").