Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / examples / C++NPv2 / Client_Logging_Daemon.cpp
blobce6f20248b0c8631840168952037830e56271e4d
1 /*
2 ** Copyright 2002 Addison Wesley. All Rights Reserved.
3 */
5 #include "ace/os_include/os_netdb.h"
6 #include "ace/OS_NS_sys_time.h"
7 #include "ace/OS_NS_sys_socket.h"
8 #include "ace/OS_NS_unistd.h"
9 #include "ace/OS_NS_string.h"
10 #include "ace/Event_Handler.h"
11 #include "ace/INET_Addr.h"
12 #include "ace/Get_Opt.h"
13 #include "ace/Log_Record.h"
14 #include "ace/Truncate.h"
15 #include "ace/Message_Block.h"
16 #include "ace/Message_Queue.h"
17 #include "ace/Reactor.h"
18 #include "ace/Service_Object.h"
19 #include "ace/Signal.h"
20 #include "ace/SOCK_Acceptor.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/SOCK_Stream.h"
23 #include "ace/Thread_Manager.h"
24 #include "Logging_Acceptor.h"
25 #include "CLD_export.h"
27 #if !defined (FLUSH_TIMEOUT)
28 #define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */
29 #endif /* FLUSH_TIMEOUT */
32 class CLD_Connector;
34 class CLD_Handler : public ACE_Event_Handler {
35 public:
36 enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX };
38 //FUZZ: disable check_for_lack_ACE_OS
39 // Initialization hook method.
40 virtual int open (CLD_Connector *);
41 virtual int close (); // Shut down hook method.
42 //FUZZ: enable check_for_lack_ACE_OS
44 // Accessor to the connection to the logging server.
45 virtual ACE_SOCK_Stream &peer () { return peer_; }
47 // Reactor hook methods.
48 virtual int handle_input (ACE_HANDLE handle);
49 virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
50 ACE_Reactor_Mask = 0);
52 protected:
53 // Forward log records to the server logging daemon.
54 virtual ACE_THR_FUNC_RETURN forward ();
56 //FUZZ: disable check_for_lack_ACE_OS
57 // Send the buffered log records using a gather-write operation.
58 virtual int send (ACE_Message_Block *chunk[], size_t &count);
59 //FUZZ: enable check_for_lack_ACE_OS
61 // Entry point into forwarder thread of control.
62 static ACE_THR_FUNC_RETURN run_svc (void *arg);
64 // A synchronized <ACE_Message_Queue> that queues messages.
65 ACE_Message_Queue<ACE_SYNCH> msg_queue_;
67 // Manage the forwarder thread.
68 ACE_Thread_Manager thr_mgr_;
70 // Pointer to our <CLD_Connector>.
71 CLD_Connector *connector_;
73 // Connection to the logging server.
74 ACE_SOCK_Stream peer_;
78 class CLD_Connector {
79 public:
80 //FUZZ: disable check_for_lack_ACE_OS
81 // Establish a connection to the logging server
82 // at the <remote_addr>.
83 int connect (CLD_Handler *handler,
84 const ACE_INET_Addr &remote_addr);
85 //FUZZ: enable check_for_lack_ACE_OS
87 // Re-establish a connection to the logging server.
88 int reconnect ();
90 private:
91 // Pointer to the <CLD_Handler> that we're connecting.
92 CLD_Handler *handler_;
94 // Address at which the logging server is listening
95 // for connections.
96 ACE_INET_Addr remote_addr_;
100 class CLD_Acceptor : public ACE_Event_Handler {
101 public:
102 //FUZZ: disable check_for_lack_ACE_OS
103 // Initialization hook method.
104 virtual int open (CLD_Handler *, const ACE_INET_Addr &,
105 ACE_Reactor * = ACE_Reactor::instance ());
106 //FUZZ: enable check_for_lack_ACE_OS
108 // Reactor hook methods.
109 virtual int handle_input (ACE_HANDLE handle);
110 virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
111 ACE_Reactor_Mask = 0);
112 virtual ACE_HANDLE get_handle () const;
114 protected:
115 // Factory that passively connects <ACE_SOCK_Stream>s.
116 ACE_SOCK_Acceptor acceptor_;
118 // Pointer to the handler of log records.
119 CLD_Handler *handler_;
122 /****************************************************/
124 int CLD_Handler::handle_input (ACE_HANDLE handle)
126 ACE_Message_Block *mblk = 0;
127 Logging_Handler logging_handler (handle);
129 if (logging_handler.recv_log_record (mblk) != -1)
131 if (msg_queue_.enqueue_tail (mblk->cont ()) != -1)
133 mblk->cont (0);
134 mblk->release ();
135 return 0; // Success return.
137 else
139 mblk->release ();
143 return -1; // Error return.
147 int CLD_Handler::handle_close (ACE_HANDLE handle,
148 ACE_Reactor_Mask)
149 { return ACE_OS::closesocket (handle); }
152 int CLD_Handler::open (CLD_Connector *connector) {
153 connector_ = connector;
154 int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
155 peer ().set_option (SOL_SOCKET, SO_SNDBUF,
156 &bufsiz, sizeof bufsiz);
157 msg_queue_.high_water_mark (CLD_Handler::QUEUE_MAX);
158 return thr_mgr_.spawn (&CLD_Handler::run_svc,
159 this, THR_SCOPE_SYSTEM);
163 ACE_THR_FUNC_RETURN CLD_Handler::run_svc (void *arg) {
164 CLD_Handler *handler = static_cast<CLD_Handler *> (arg);
165 return handler->forward ();
169 ACE_THR_FUNC_RETURN CLD_Handler::forward () {
170 ACE_Message_Block *chunk[ACE_IOV_MAX];
171 size_t message_index = 0;
172 ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ());
173 ACE_Time_Value timeout;
174 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
175 ACE_Sig_Action original_action;
176 no_sigpipe.register_action (SIGPIPE, &original_action);
178 for (;;) {
179 if (message_index == 0) {
180 timeout = ACE_OS::gettimeofday ();
181 timeout += FLUSH_TIMEOUT;
183 ACE_Message_Block *mblk = 0;
184 if (msg_queue_.dequeue_head (mblk, &timeout) == -1) {
185 if (errno != EWOULDBLOCK) break;
186 else if (message_index == 0) continue;
187 } else {
188 if (mblk->size () == 0
189 && mblk->msg_type () == ACE_Message_Block::MB_STOP)
190 { mblk->release (); break; }
191 chunk[message_index] = mblk;
192 ++message_index;
194 if (message_index >= ACE_IOV_MAX ||
195 (ACE_OS::gettimeofday () - time_of_last_send
196 >= ACE_Time_Value(FLUSH_TIMEOUT))) {
197 if (this->send (chunk, message_index) == -1) break;
198 time_of_last_send = ACE_OS::gettimeofday ();
202 if (message_index > 0)
203 this->send (chunk, message_index);
205 msg_queue_.close ();
206 no_sigpipe.restore_action (SIGPIPE, original_action);
207 return 0;
211 int CLD_Handler::send (ACE_Message_Block *chunk[], size_t &count) {
212 iovec iov[ACE_IOV_MAX];
213 size_t iov_size;
214 int result = 0;
216 for (iov_size = 0; iov_size < count; ++iov_size) {
217 iov[iov_size].iov_base = chunk[iov_size]->rd_ptr ();
218 iov[iov_size].iov_len =
219 ACE_Utils::truncate_cast<u_long> (chunk[iov_size]->length ());
221 while (peer ().sendv_n (iov, ACE_Utils::truncate_cast<int> (iov_size)) == -1)
222 if (connector_->reconnect () == -1) {
223 result = -1;
224 break;
227 while (iov_size > 0) {
228 chunk[--iov_size]->release (); chunk[iov_size] = 0;
230 count = iov_size;
231 return result;
235 int CLD_Handler::close () {
236 ACE_Message_Block *shutdown_message = 0;
237 ACE_NEW_RETURN
238 (shutdown_message,
239 ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1);
240 msg_queue_.enqueue_tail (shutdown_message);
241 return thr_mgr_.wait ();
244 /**************************************************************/
247 int CLD_Acceptor::open (CLD_Handler *handler,
248 const ACE_INET_Addr &local_addr,
249 ACE_Reactor *r) {
250 reactor (r); // Store the reactor pointer.
251 handler_ = handler;
252 if (acceptor_.open (local_addr) == -1
253 || reactor ()->register_handler
254 (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
255 return -1;
256 return 0;
260 ACE_HANDLE CLD_Acceptor::get_handle () const
261 { return acceptor_.get_handle (); }
264 int CLD_Acceptor::handle_input (ACE_HANDLE) {
265 ACE_SOCK_Stream peer_stream;
266 if (acceptor_.accept (peer_stream) == -1) return -1;
267 else if (reactor ()->register_handler
268 (peer_stream.get_handle (),
269 handler_,
270 ACE_Event_Handler::READ_MASK) == -1)
271 return -1;
272 else return 0;
276 int CLD_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask) {
277 acceptor_.close ();
278 handler_->close ();
279 return 0;
283 /***************************************************/
286 int CLD_Connector::connect
287 (CLD_Handler *handler,
288 const ACE_INET_Addr &remote_addr) {
289 ACE_SOCK_Connector connector;
291 if (connector.connect (handler->peer (), remote_addr) == -1)
292 return -1;
293 else if (handler->open (this) == -1)
294 { handler->handle_close (); return -1; }
295 handler_ = handler;
296 remote_addr_ = remote_addr;
297 return 0;
301 int CLD_Connector::reconnect () {
302 // Maximum number of times to retry connect.
303 const size_t MAX_RETRIES = 5;
305 ACE_SOCK_Connector connector;
306 ACE_Time_Value timeout (1); // Start with 1 second timeout.
307 size_t i;
308 for (i = 0; i < MAX_RETRIES; ++i) {
309 if (i > 0) ACE_OS::sleep (timeout);
310 if (connector.connect (handler_->peer (), remote_addr_,
311 &timeout) == -1)
312 timeout *= 2; // Exponential backoff.
313 else {
314 int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
315 handler_->peer ().set_option (SOL_SOCKET, SO_SNDBUF,
316 &bufsiz, sizeof bufsiz);
317 break;
320 return i == MAX_RETRIES ? -1 : 0;
324 /*******************************************************/
326 class Client_Logging_Daemon : public ACE_Service_Object {
327 public:
328 virtual ~Client_Logging_Daemon () {} // Turn off g++ warnings.
330 // Service Configurator hook methods.
331 virtual int init (int argc, ACE_TCHAR *argv[]);
332 virtual int fini ();
333 #if 0
334 // Implementing these methods is left as an exercise for the reader.
335 virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const;
336 virtual int suspend ();
337 virtual int resume ();
338 #endif
340 protected:
341 // Receives, processes, and forwards log records.
342 CLD_Handler handler_;
344 // Factory that passively connects the <CLD_Handler>.
345 CLD_Acceptor acceptor_;
347 // Factory that actively connects the <CLD_Handler>.
348 CLD_Connector connector_;
352 int Client_Logging_Daemon::init (int argc, ACE_TCHAR *argv[]) {
353 u_short cld_port = ACE_DEFAULT_SERVICE_PORT;
354 u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT;
355 ACE_TCHAR sld_host[MAXHOSTNAMELEN];
356 ACE_OS::strcpy (sld_host, ACE_LOCALHOST);
358 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0);
359 get_opt.long_option (ACE_TEXT ("client_port"), 'p',
360 ACE_Get_Opt::ARG_REQUIRED);
361 get_opt.long_option (ACE_TEXT ("server_port"), 'r',
362 ACE_Get_Opt::ARG_REQUIRED);
363 get_opt.long_option (ACE_TEXT ("server_name"), 's',
364 ACE_Get_Opt::ARG_REQUIRED);
366 for (int c; (c = get_opt ()) != -1;)
367 switch (c) {
368 case 'p': // Client logging daemon acceptor port number.
369 cld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));
370 break;
371 case 'r': // Server logging daemon acceptor port number.
372 sld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));
373 break;
374 case 's': // Server logging daemon hostname.
375 ACE_OS::strsncpy
376 (sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN);
377 break;
380 ACE_INET_Addr cld_addr (cld_port);
381 ACE_INET_Addr sld_addr (sld_port, sld_host);
383 if (acceptor_.open (&handler_, cld_addr) == -1)
384 return -1;
385 else if (connector_.connect (&handler_, sld_addr) == -1)
386 { acceptor_.handle_close (); return -1; }
387 return 0;
391 int Client_Logging_Daemon::fini () {
392 acceptor_.handle_close ();
393 handler_.close ();
394 return 0;
398 ACE_FACTORY_DEFINE (CLD, Client_Logging_Daemon)