2 ** Copyright 2002 Addison Wesley. All Rights Reserved.
5 #include "ace/os_include/os_netdb.h"
6 #include "ace/OS_NS_sys_time.h"
7 #include "ace/OS_NS_sys_socket.h"
8 #include "ace/OS_NS_unistd.h"
9 #include "ace/OS_NS_string.h"
10 #include "ace/Event_Handler.h"
11 #include "ace/INET_Addr.h"
12 #include "ace/Get_Opt.h"
13 #include "ace/Log_Record.h"
14 #include "ace/Truncate.h"
15 #include "ace/Message_Block.h"
16 #include "ace/Message_Queue.h"
17 #include "ace/Reactor.h"
18 #include "ace/Service_Object.h"
19 #include "ace/Signal.h"
20 #include "ace/SOCK_Acceptor.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/SOCK_Stream.h"
23 #include "ace/Thread_Manager.h"
24 #include "Logging_Acceptor.h"
25 #include "CLD_export.h"
27 #if !defined (FLUSH_TIMEOUT)
28 #define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */
29 #endif /* FLUSH_TIMEOUT */
34 class CLD_Handler
: public ACE_Event_Handler
{
36 enum { QUEUE_MAX
= sizeof (ACE_Log_Record
) * ACE_IOV_MAX
};
38 //FUZZ: disable check_for_lack_ACE_OS
39 // Initialization hook method.
40 virtual int open (CLD_Connector
*);
41 virtual int close (); // Shut down hook method.
42 //FUZZ: enable check_for_lack_ACE_OS
44 // Accessor to the connection to the logging server.
45 virtual ACE_SOCK_Stream
&peer () { return peer_
; }
47 // Reactor hook methods.
48 virtual int handle_input (ACE_HANDLE handle
);
49 virtual int handle_close (ACE_HANDLE
= ACE_INVALID_HANDLE
,
50 ACE_Reactor_Mask
= 0);
53 // Forward log records to the server logging daemon.
54 virtual ACE_THR_FUNC_RETURN
forward ();
56 //FUZZ: disable check_for_lack_ACE_OS
57 // Send the buffered log records using a gather-write operation.
58 virtual int send (ACE_Message_Block
*chunk
[], size_t &count
);
59 //FUZZ: enable check_for_lack_ACE_OS
61 // Entry point into forwarder thread of control.
62 static ACE_THR_FUNC_RETURN
run_svc (void *arg
);
64 // A synchronized <ACE_Message_Queue> that queues messages.
65 ACE_Message_Queue
<ACE_SYNCH
> msg_queue_
;
67 // Manage the forwarder thread.
68 ACE_Thread_Manager thr_mgr_
;
70 // Pointer to our <CLD_Connector>.
71 CLD_Connector
*connector_
;
73 // Connection to the logging server.
74 ACE_SOCK_Stream peer_
;
80 //FUZZ: disable check_for_lack_ACE_OS
81 // Establish a connection to the logging server
82 // at the <remote_addr>.
83 int connect (CLD_Handler
*handler
,
84 const ACE_INET_Addr
&remote_addr
);
85 //FUZZ: enable check_for_lack_ACE_OS
87 // Re-establish a connection to the logging server.
91 // Pointer to the <CLD_Handler> that we're connecting.
92 CLD_Handler
*handler_
;
94 // Address at which the logging server is listening
96 ACE_INET_Addr remote_addr_
;
100 class CLD_Acceptor
: public ACE_Event_Handler
{
102 //FUZZ: disable check_for_lack_ACE_OS
103 // Initialization hook method.
104 virtual int open (CLD_Handler
*, const ACE_INET_Addr
&,
105 ACE_Reactor
* = ACE_Reactor::instance ());
106 //FUZZ: enable check_for_lack_ACE_OS
108 // Reactor hook methods.
109 virtual int handle_input (ACE_HANDLE handle
);
110 virtual int handle_close (ACE_HANDLE
= ACE_INVALID_HANDLE
,
111 ACE_Reactor_Mask
= 0);
112 virtual ACE_HANDLE
get_handle () const;
115 // Factory that passively connects <ACE_SOCK_Stream>s.
116 ACE_SOCK_Acceptor acceptor_
;
118 // Pointer to the handler of log records.
119 CLD_Handler
*handler_
;
122 /****************************************************/
124 int CLD_Handler::handle_input (ACE_HANDLE handle
)
126 ACE_Message_Block
*mblk
= 0;
127 Logging_Handler
logging_handler (handle
);
129 if (logging_handler
.recv_log_record (mblk
) != -1)
131 if (msg_queue_
.enqueue_tail (mblk
->cont ()) != -1)
135 return 0; // Success return.
143 return -1; // Error return.
147 int CLD_Handler::handle_close (ACE_HANDLE handle
,
149 { return ACE_OS::closesocket (handle
); }
152 int CLD_Handler::open (CLD_Connector
*connector
) {
153 connector_
= connector
;
154 int bufsiz
= ACE_DEFAULT_MAX_SOCKET_BUFSIZ
;
155 peer ().set_option (SOL_SOCKET
, SO_SNDBUF
,
156 &bufsiz
, sizeof bufsiz
);
157 msg_queue_
.high_water_mark (CLD_Handler::QUEUE_MAX
);
158 return thr_mgr_
.spawn (&CLD_Handler::run_svc
,
159 this, THR_SCOPE_SYSTEM
);
163 ACE_THR_FUNC_RETURN
CLD_Handler::run_svc (void *arg
) {
164 CLD_Handler
*handler
= static_cast<CLD_Handler
*> (arg
);
165 return handler
->forward ();
169 ACE_THR_FUNC_RETURN
CLD_Handler::forward () {
170 ACE_Message_Block
*chunk
[ACE_IOV_MAX
];
171 size_t message_index
= 0;
172 ACE_Time_Value
time_of_last_send (ACE_OS::gettimeofday ());
173 ACE_Time_Value timeout
;
174 ACE_Sig_Action
no_sigpipe ((ACE_SignalHandler
) SIG_IGN
);
175 ACE_Sig_Action original_action
;
176 no_sigpipe
.register_action (SIGPIPE
, &original_action
);
179 if (message_index
== 0) {
180 timeout
= ACE_OS::gettimeofday ();
181 timeout
+= FLUSH_TIMEOUT
;
183 ACE_Message_Block
*mblk
= 0;
184 if (msg_queue_
.dequeue_head (mblk
, &timeout
) == -1) {
185 if (errno
!= EWOULDBLOCK
) break;
186 else if (message_index
== 0) continue;
188 if (mblk
->size () == 0
189 && mblk
->msg_type () == ACE_Message_Block::MB_STOP
)
190 { mblk
->release (); break; }
191 chunk
[message_index
] = mblk
;
194 if (message_index
>= ACE_IOV_MAX
||
195 (ACE_OS::gettimeofday () - time_of_last_send
196 >= ACE_Time_Value(FLUSH_TIMEOUT
))) {
197 if (this->send (chunk
, message_index
) == -1) break;
198 time_of_last_send
= ACE_OS::gettimeofday ();
202 if (message_index
> 0)
203 this->send (chunk
, message_index
);
206 no_sigpipe
.restore_action (SIGPIPE
, original_action
);
211 int CLD_Handler::send (ACE_Message_Block
*chunk
[], size_t &count
) {
212 iovec iov
[ACE_IOV_MAX
];
216 for (iov_size
= 0; iov_size
< count
; ++iov_size
) {
217 iov
[iov_size
].iov_base
= chunk
[iov_size
]->rd_ptr ();
218 iov
[iov_size
].iov_len
=
219 ACE_Utils::truncate_cast
<u_long
> (chunk
[iov_size
]->length ());
221 while (peer ().sendv_n (iov
, ACE_Utils::truncate_cast
<int> (iov_size
)) == -1)
222 if (connector_
->reconnect () == -1) {
227 while (iov_size
> 0) {
228 chunk
[--iov_size
]->release (); chunk
[iov_size
] = 0;
235 int CLD_Handler::close () {
236 ACE_Message_Block
*shutdown_message
= 0;
239 ACE_Message_Block (0, ACE_Message_Block::MB_STOP
), -1);
240 msg_queue_
.enqueue_tail (shutdown_message
);
241 return thr_mgr_
.wait ();
244 /**************************************************************/
247 int CLD_Acceptor::open (CLD_Handler
*handler
,
248 const ACE_INET_Addr
&local_addr
,
250 reactor (r
); // Store the reactor pointer.
252 if (acceptor_
.open (local_addr
) == -1
253 || reactor ()->register_handler
254 (this, ACE_Event_Handler::ACCEPT_MASK
) == -1)
260 ACE_HANDLE
CLD_Acceptor::get_handle () const
261 { return acceptor_
.get_handle (); }
264 int CLD_Acceptor::handle_input (ACE_HANDLE
) {
265 ACE_SOCK_Stream peer_stream
;
266 if (acceptor_
.accept (peer_stream
) == -1) return -1;
267 else if (reactor ()->register_handler
268 (peer_stream
.get_handle (),
270 ACE_Event_Handler::READ_MASK
) == -1)
276 int CLD_Acceptor::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
) {
283 /***************************************************/
286 int CLD_Connector::connect
287 (CLD_Handler
*handler
,
288 const ACE_INET_Addr
&remote_addr
) {
289 ACE_SOCK_Connector connector
;
291 if (connector
.connect (handler
->peer (), remote_addr
) == -1)
293 else if (handler
->open (this) == -1)
294 { handler
->handle_close (); return -1; }
296 remote_addr_
= remote_addr
;
301 int CLD_Connector::reconnect () {
302 // Maximum number of times to retry connect.
303 const size_t MAX_RETRIES
= 5;
305 ACE_SOCK_Connector connector
;
306 ACE_Time_Value
timeout (1); // Start with 1 second timeout.
308 for (i
= 0; i
< MAX_RETRIES
; ++i
) {
309 if (i
> 0) ACE_OS::sleep (timeout
);
310 if (connector
.connect (handler_
->peer (), remote_addr_
,
312 timeout
*= 2; // Exponential backoff.
314 int bufsiz
= ACE_DEFAULT_MAX_SOCKET_BUFSIZ
;
315 handler_
->peer ().set_option (SOL_SOCKET
, SO_SNDBUF
,
316 &bufsiz
, sizeof bufsiz
);
320 return i
== MAX_RETRIES
? -1 : 0;
324 /*******************************************************/
326 class Client_Logging_Daemon
: public ACE_Service_Object
{
328 virtual ~Client_Logging_Daemon () {} // Turn off g++ warnings.
330 // Service Configurator hook methods.
331 virtual int init (int argc
, ACE_TCHAR
*argv
[]);
334 // Implementing these methods is left as an exercise for the reader.
335 virtual int info (ACE_TCHAR
**bufferp
, size_t length
= 0) const;
336 virtual int suspend ();
337 virtual int resume ();
341 // Receives, processes, and forwards log records.
342 CLD_Handler handler_
;
344 // Factory that passively connects the <CLD_Handler>.
345 CLD_Acceptor acceptor_
;
347 // Factory that actively connects the <CLD_Handler>.
348 CLD_Connector connector_
;
352 int Client_Logging_Daemon::init (int argc
, ACE_TCHAR
*argv
[]) {
353 u_short cld_port
= ACE_DEFAULT_SERVICE_PORT
;
354 u_short sld_port
= ACE_DEFAULT_LOGGING_SERVER_PORT
;
355 ACE_TCHAR sld_host
[MAXHOSTNAMELEN
];
356 ACE_OS::strcpy (sld_host
, ACE_LOCALHOST
);
358 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("p:r:s:"), 0);
359 get_opt
.long_option (ACE_TEXT ("client_port"), 'p',
360 ACE_Get_Opt::ARG_REQUIRED
);
361 get_opt
.long_option (ACE_TEXT ("server_port"), 'r',
362 ACE_Get_Opt::ARG_REQUIRED
);
363 get_opt
.long_option (ACE_TEXT ("server_name"), 's',
364 ACE_Get_Opt::ARG_REQUIRED
);
366 for (int c
; (c
= get_opt ()) != -1;)
368 case 'p': // Client logging daemon acceptor port number.
369 cld_port
= static_cast<u_short
> (ACE_OS::atoi (get_opt
.opt_arg ()));
371 case 'r': // Server logging daemon acceptor port number.
372 sld_port
= static_cast<u_short
> (ACE_OS::atoi (get_opt
.opt_arg ()));
374 case 's': // Server logging daemon hostname.
376 (sld_host
, get_opt
.opt_arg (), MAXHOSTNAMELEN
);
380 ACE_INET_Addr
cld_addr (cld_port
);
381 ACE_INET_Addr
sld_addr (sld_port
, sld_host
);
383 if (acceptor_
.open (&handler_
, cld_addr
) == -1)
385 else if (connector_
.connect (&handler_
, sld_addr
) == -1)
386 { acceptor_
.handle_close (); return -1; }
391 int Client_Logging_Daemon::fini () {
392 acceptor_
.handle_close ();
398 ACE_FACTORY_DEFINE (CLD
, Client_Logging_Daemon
)