2 ** Copyright 2002 Addison Wesley. All Rights Reserved.
5 #include "ace/OS_NS_string.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "ace/OS_NS_sys_socket.h"
8 #include "ace/OS_NS_sys_time.h"
9 #include "ace/INET_Addr.h"
10 #include "ace/SOCK_Acceptor.h"
11 #include "ace/SOCK_Connector.h"
12 #include "ace/SOCK_Stream.h"
13 #include "ace/Acceptor.h"
14 #include "ace/Connector.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Handle_Set.h"
17 #include "ace/Log_Record.h"
18 #include "ace/Truncate.h"
19 #include "ace/Message_Block.h"
20 #include "ace/Reactor.h"
21 #include "ace/Service_Object.h"
22 #include "ace/Signal.h"
23 #include "ace/Svc_Handler.h"
24 #include "ace/Thread_Manager.h"
25 #include "ace/os_include/os_netdb.h"
26 #include "Logging_Handler.h"
27 #include "AC_CLD_export.h"
29 #include "AC_Client_Logging_Daemon.h"
31 #include <openssl/ssl.h>
35 : public ACE_Acceptor
<AC_Input_Handler
, ACE_SOCK_ACCEPTOR
> {
38 AC_CLD_Acceptor (AC_Output_Handler
*handler
= 0)
39 : output_handler_ (handler
), input_handler_ (handler
) {}
42 typedef ACE_Acceptor
<AC_Input_Handler
, ACE_SOCK_ACCEPTOR
>
45 // <ACE_Acceptor> factory method.
46 virtual int make_svc_handler (AC_Input_Handler
*&sh
);
48 // <ACE_Reactor> close hook method.
49 virtual int handle_close (ACE_HANDLE
= ACE_INVALID_HANDLE
,
50 ACE_Reactor_Mask
= 0);
52 // Pointer to the output handler.
53 AC_Output_Handler
*output_handler_
;
55 // Single input handler.
56 AC_Input_Handler input_handler_
;
59 class AC_CLD_Connector
60 : public ACE_Connector
<AC_Output_Handler
, ACE_SOCK_CONNECTOR
> {
62 typedef ACE_Connector
<AC_Output_Handler
, ACE_SOCK_CONNECTOR
>
66 AC_CLD_Connector (AC_Output_Handler
*handler
= 0)
67 : handler_ (handler
), ssl_ctx_ (0), ssl_ (0) {}
69 // Destructor frees the SSL resources.
70 virtual ~AC_CLD_Connector () {
72 SSL_CTX_free (ssl_ctx_
);
75 //FUZZ: disable check_for_lack_ACE_OS
76 // Initialize the Connector.
77 virtual int open (ACE_Reactor
*r
= ACE_Reactor::instance (),
79 //FUZZ: enable check_for_lack_ACE_OS
81 // Re-establish a connection to the logging server.
85 // Connection establishment and authentication hook method.
86 virtual int connect_svc_handler
87 (AC_Output_Handler
*&svc_handler
,
88 const ACE_SOCK_Connector::PEER_ADDR
&remote_addr
,
89 ACE_Time_Value
*timeout
,
90 const ACE_SOCK_Connector::PEER_ADDR
&local_addr
,
91 int reuse_addr
, int flags
, int perms
);
93 virtual int connect_svc_handler
94 (AC_Output_Handler
*&svc_handler
,
95 AC_Output_Handler
*&sh_copy
,
96 const ACE_SOCK_Connector::PEER_ADDR
&remote_addr
,
97 ACE_Time_Value
*timeout
,
98 const ACE_SOCK_Connector::PEER_ADDR
&local_addr
,
99 int reuse_addr
, int flags
, int perms
);
101 // Pointer to <AC_Output_Handler> we're connecting.
102 AC_Output_Handler
*handler_
;
104 // Address at which logging server listens for connections.
105 ACE_INET_Addr remote_addr_
;
107 // The SSL "context" data structure.
110 // The SSL data structure corresponding to authenticated SSL
115 class AC_Client_Logging_Daemon
: public ACE_Service_Object
{
117 // Factory that passively connects the <AC_Input_Handler>.
118 AC_CLD_Acceptor acceptor_
;
120 // Factory that actively connects the <AC_Output_Handler>.
121 AC_CLD_Connector connector_
;
123 // The <AC_Output_Handler> connected by <AC_CLD_Connector>.
124 AC_Output_Handler output_handler_
;
128 AC_Client_Logging_Daemon ()
129 : acceptor_ (&output_handler_
),
130 connector_ (&output_handler_
) {}
132 // Service Configurator hook methods.
133 virtual int init (int argc
, ACE_TCHAR
*argv
[]);
135 //virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const;
136 //virtual int suspend ();
137 //virtual int resume ();
140 /******************************************************/
142 #if !defined (FLUSH_TIMEOUT)
143 #define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */
144 #endif /* FLUSH_TIMEOUT */
146 int AC_Output_Handler::open (void *connector
) {
148 static_cast<AC_CLD_Connector
*> (connector
);
149 int bufsiz
= ACE_DEFAULT_MAX_SOCKET_BUFSIZ
;
150 peer ().set_option (SOL_SOCKET
, SO_SNDBUF
,
151 &bufsiz
, sizeof bufsiz
);
152 if (reactor ()->register_handler
153 (this, ACE_Event_Handler::READ_MASK
) == -1)
155 if (msg_queue ()->activate ()
156 == ACE_Message_Queue_Base::ACTIVATED
) {
157 msg_queue ()->high_water_mark (QUEUE_MAX
);
158 return activate (THR_SCOPE_SYSTEM
);
162 int AC_Output_Handler::put (ACE_Message_Block
*mb
,
163 ACE_Time_Value
*timeout
) {
165 while ((retval
= putq (mb
, timeout
)) == -1) {
166 if (msg_queue ()->state () != ACE_Message_Queue_Base::PULSED
)
172 int AC_Output_Handler::handle_input (ACE_HANDLE h
) {
174 reactor ()->remove_handler
175 (h
, ACE_Event_Handler::READ_MASK
176 | ACE_Event_Handler::DONT_CALL
);
177 msg_queue ()->pulse ();
181 int AC_Output_Handler::svc () {
182 ACE_Message_Block
*chunk
[ACE_IOV_MAX
];
183 size_t message_index
= 0;
184 ACE_Time_Value
time_of_last_send (ACE_OS::gettimeofday ());
185 ACE_Time_Value timeout
;
186 ACE_Sig_Action
no_sigpipe ((ACE_SignalHandler
) SIG_IGN
);
187 ACE_Sig_Action original_action
;
188 no_sigpipe
.register_action (SIGPIPE
, &original_action
);
191 if (message_index
== 0) {
192 timeout
= ACE_OS::gettimeofday ();
193 timeout
+= FLUSH_TIMEOUT
;
195 ACE_Message_Block
*mblk
= 0;
196 if (getq (mblk
, &timeout
) == -1) {
197 if (errno
== ESHUTDOWN
) {
198 if (connector_
->reconnect () == -1) break;
200 } else if (errno
!= EWOULDBLOCK
) break;
201 else if (message_index
== 0) continue;
203 if (mblk
->size () == 0
204 && mblk
->msg_type () == ACE_Message_Block::MB_STOP
)
205 { mblk
->release (); break; }
206 chunk
[message_index
] = mblk
;
209 if (message_index
>= ACE_IOV_MAX
||
210 (ACE_OS::gettimeofday () - time_of_last_send
211 >= ACE_Time_Value(FLUSH_TIMEOUT
))) {
212 if (this->send (chunk
, message_index
) == -1) break;
213 time_of_last_send
= ACE_OS::gettimeofday ();
217 if (message_index
> 0)
218 this->send (chunk
, message_index
);
219 no_sigpipe
.restore_action (SIGPIPE
, original_action
);
223 int AC_Output_Handler::send (ACE_Message_Block
*chunk
[], size_t &count
) {
224 iovec iov
[ACE_IOV_MAX
];
228 for (iov_size
= 0; iov_size
< count
; ++iov_size
) {
229 iov
[iov_size
].iov_base
= chunk
[iov_size
]->rd_ptr ();
230 iov
[iov_size
].iov_len
=
231 ACE_Utils::truncate_cast
<u_long
> (chunk
[iov_size
]->length ());
233 while (peer ().sendv_n (iov
, ACE_Utils::truncate_cast
<int> (iov_size
)) == -1)
234 if (connector_
->reconnect () == -1) {
239 while (iov_size
> 0) {
240 chunk
[--iov_size
]->release (); chunk
[iov_size
] = 0;
246 /******************************************************/
248 int AC_Input_Handler::open (void *) {
249 ACE_HANDLE handle
= peer ().get_handle ();
250 if (reactor ()->register_handler
251 (handle
, this, ACE_Event_Handler::READ_MASK
) == -1)
253 connected_clients_
.set_bit (handle
);
257 int AC_Input_Handler::close (u_long
) {
258 ACE_Message_Block
*shutdown_message
= 0;
261 ACE_Message_Block (0, ACE_Message_Block::MB_STOP
), -1);
262 output_handler_
->put (shutdown_message
);
264 reactor ()->remove_handler
265 (connected_clients_
, ACE_Event_Handler::READ_MASK
);
266 return output_handler_
->wait ();
269 int AC_Input_Handler::handle_input (ACE_HANDLE handle
) {
270 ACE_Message_Block
*mblk
= 0;
271 Logging_Handler
logging_handler (handle
);
273 if (logging_handler
.recv_log_record (mblk
) != -1)
275 if (output_handler_
->put (mblk
->cont ()) != -1)
279 return 0; // Success return.
287 return -1; // Error return.
290 int AC_Input_Handler::handle_close (ACE_HANDLE handle
,
292 connected_clients_
.clr_bit (handle
);
293 return ACE_OS::closesocket (handle
);
296 /********************************************************/
298 int AC_CLD_Acceptor::make_svc_handler (AC_Input_Handler
*&sh
)
299 { sh
= &input_handler_
; return 0; }
302 int AC_CLD_Acceptor::handle_close (ACE_HANDLE
,
304 PARENT::handle_close ();
305 input_handler_
.close ();
309 /*******************************************************/
311 #if !defined (CLD_CERTIFICATE_FILENAME)
312 # define CLD_CERTIFICATE_FILENAME "cld-cert.pem"
313 #endif /* !CLD_CERTIFICATE_FILENAME */
314 #if !defined (CLD_KEY_FILENAME)
315 # define CLD_KEY_FILENAME "cld-key.pem"
316 #endif /* !CLD_KEY_FILENAME */
318 int AC_CLD_Connector::open (ACE_Reactor
*r
, int flags
) {
319 if (PARENT::open (r
, flags
) != 0) return -1;
320 OpenSSL_add_ssl_algorithms ();
321 ssl_ctx_
= SSL_CTX_new (SSLv23_client_method ());
322 if (ssl_ctx_
== 0) return -1;
324 if (SSL_CTX_use_certificate_file (ssl_ctx_
,
325 CLD_CERTIFICATE_FILENAME
,
326 SSL_FILETYPE_PEM
) <= 0
327 || SSL_CTX_use_PrivateKey_file (ssl_ctx_
,
329 SSL_FILETYPE_PEM
) <= 0
330 || !SSL_CTX_check_private_key (ssl_ctx_
))
333 ssl_
= SSL_new (ssl_ctx_
);
334 if (ssl_
== 0) return -1;
338 int AC_CLD_Connector::connect_svc_handler
339 (AC_Output_Handler
*&svc_handler
,
340 const ACE_SOCK_Connector::PEER_ADDR
&remote_addr
,
341 ACE_Time_Value
*timeout
,
342 const ACE_SOCK_Connector::PEER_ADDR
&local_addr
,
343 int reuse_addr
, int flags
, int perms
) {
344 if (PARENT::connect_svc_handler
345 (svc_handler
, remote_addr
, timeout
,
346 local_addr
, reuse_addr
, flags
, perms
) == -1) return -1;
348 #if defined (ACE_WIN32)
349 // ACE_WIN32 is the only platform where ACE_HANDLE is not an int.
350 // See ace/config-lite.h for the typedefs.
352 reinterpret_cast<int> (svc_handler
->get_handle ()));
354 SSL_set_fd (ssl_
, svc_handler
->get_handle ());
355 #endif /* ACE_WIN32 */
357 SSL_set_verify (ssl_
, SSL_VERIFY_PEER
, 0);
359 if (SSL_connect (ssl_
) == -1
360 || SSL_shutdown (ssl_
) == -1) return -1;
361 remote_addr_
= remote_addr
;
365 int AC_CLD_Connector::connect_svc_handler
366 (AC_Output_Handler
*&svc_handler
,
367 AC_Output_Handler
*&sh_copy
,
368 const ACE_SOCK_Connector::PEER_ADDR
&remote_addr
,
369 ACE_Time_Value
*timeout
,
370 const ACE_SOCK_Connector::PEER_ADDR
&local_addr
,
371 int reuse_addr
, int flags
, int perms
) {
372 sh_copy
= svc_handler
;
373 return this->connect_svc_handler (svc_handler
, remote_addr
, timeout
,
374 local_addr
, reuse_addr
, flags
, perms
);
377 int AC_CLD_Connector::reconnect () {
378 // Maximum number of times to retry connect.
379 const size_t MAX_RETRIES
= 5;
380 ACE_Time_Value
timeout (1);
382 for (i
= 0; i
< MAX_RETRIES
; ++i
) {
383 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
,
385 if (i
> 0) ACE_OS::sleep (timeout
);
386 if (this->connect (handler_
, remote_addr_
, options
) == 0)
388 timeout
*= 2; // Exponential backoff.
390 return i
== MAX_RETRIES
? -1 : 0;
393 /******************************************************/
395 int AC_Client_Logging_Daemon::init
396 (int argc
, ACE_TCHAR
*argv
[]) {
397 u_short cld_port
= ACE_DEFAULT_SERVICE_PORT
;
398 u_short sld_port
= ACE_DEFAULT_LOGGING_SERVER_PORT
;
399 ACE_TCHAR sld_host
[MAXHOSTNAMELEN
];
400 ACE_OS::strcpy (sld_host
, ACE_LOCALHOST
);
402 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("p:r:s:"), 0);
403 get_opt
.long_option (ACE_TEXT ("client_port"), 'p',
404 ACE_Get_Opt::ARG_REQUIRED
);
405 get_opt
.long_option (ACE_TEXT ("server_port"), 'r',
406 ACE_Get_Opt::ARG_REQUIRED
);
407 get_opt
.long_option (ACE_TEXT ("server_name"), 's',
408 ACE_Get_Opt::ARG_REQUIRED
);
410 for (int c
; (c
= get_opt ()) != -1;)
412 case 'p': // Client logging daemon acceptor port number.
413 cld_port
= static_cast<u_short
> (ACE_OS::atoi (get_opt
.opt_arg ()));
415 case 'r': // Server logging daemon acceptor port number.
416 sld_port
= static_cast<u_short
> (ACE_OS::atoi (get_opt
.opt_arg ()));
418 case 's': // Server logging daemon hostname.
420 (sld_host
, get_opt
.opt_arg (), MAXHOSTNAMELEN
);
424 ACE_INET_Addr
cld_addr (cld_port
);
425 ACE_INET_Addr
sld_addr (sld_port
, sld_host
);
427 if (acceptor_
.open (cld_addr
) == -1) return -1;
428 AC_Output_Handler
*oh
= &output_handler_
;
429 if (connector_
.connect (oh
, sld_addr
) == -1)
430 { acceptor_
.close (); return -1; }
434 int AC_Client_Logging_Daemon::fini ()
435 { return acceptor_
.close (); }
437 ACE_FACTORY_DEFINE (AC_CLD
, AC_Client_Logging_Daemon
)