Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / C++NPv2 / AC_Client_Logging_Daemon.cpp
blob2ca3e6e97bf345f06318882a05b671ee2584f3e2
1 /*
2 ** Copyright 2002 Addison Wesley. All Rights Reserved.
3 */
5 #include "ace/OS_NS_string.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "ace/OS_NS_sys_socket.h"
8 #include "ace/OS_NS_sys_time.h"
9 #include "ace/INET_Addr.h"
10 #include "ace/SOCK_Acceptor.h"
11 #include "ace/SOCK_Connector.h"
12 #include "ace/SOCK_Stream.h"
13 #include "ace/Acceptor.h"
14 #include "ace/Connector.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Handle_Set.h"
17 #include "ace/Log_Record.h"
18 #include "ace/Truncate.h"
19 #include "ace/Message_Block.h"
20 #include "ace/Reactor.h"
21 #include "ace/Service_Object.h"
22 #include "ace/Signal.h"
23 #include "ace/Svc_Handler.h"
24 #include "ace/Thread_Manager.h"
25 #include "ace/os_include/os_netdb.h"
26 #include "Logging_Handler.h"
27 #include "AC_CLD_export.h"
29 #include "AC_Client_Logging_Daemon.h"
31 #include <openssl/ssl.h>
34 class AC_CLD_Acceptor
35 : public ACE_Acceptor<AC_Input_Handler, ACE_SOCK_ACCEPTOR> {
36 public:
37 // Constructor.
38 AC_CLD_Acceptor (AC_Output_Handler *handler = 0)
39 : output_handler_ (handler), input_handler_ (handler) {}
41 protected:
42 typedef ACE_Acceptor<AC_Input_Handler, ACE_SOCK_ACCEPTOR>
43 PARENT;
45 // <ACE_Acceptor> factory method.
46 virtual int make_svc_handler (AC_Input_Handler *&sh);
48 // <ACE_Reactor> close hook method.
49 virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
50 ACE_Reactor_Mask = 0);
52 // Pointer to the output handler.
53 AC_Output_Handler *output_handler_;
55 // Single input handler.
56 AC_Input_Handler input_handler_;
59 class AC_CLD_Connector
60 : public ACE_Connector<AC_Output_Handler, ACE_SOCK_CONNECTOR> {
61 public:
62 typedef ACE_Connector<AC_Output_Handler, ACE_SOCK_CONNECTOR>
63 PARENT;
65 // Constructor.
66 AC_CLD_Connector (AC_Output_Handler *handler = 0)
67 : handler_ (handler), ssl_ctx_ (0), ssl_ (0) {}
69 // Destructor frees the SSL resources.
70 virtual ~AC_CLD_Connector () {
71 SSL_free (ssl_);
72 SSL_CTX_free (ssl_ctx_);
75 //FUZZ: disable check_for_lack_ACE_OS
76 // Initialize the Connector.
77 virtual int open (ACE_Reactor *r = ACE_Reactor::instance (),
78 int flags = 0);
79 //FUZZ: enable check_for_lack_ACE_OS
81 // Re-establish a connection to the logging server.
82 int reconnect ();
84 protected:
85 // Connection establishment and authentication hook method.
86 virtual int connect_svc_handler
87 (AC_Output_Handler *&svc_handler,
88 const ACE_SOCK_Connector::PEER_ADDR &remote_addr,
89 ACE_Time_Value *timeout,
90 const ACE_SOCK_Connector::PEER_ADDR &local_addr,
91 int reuse_addr, int flags, int perms);
93 virtual int connect_svc_handler
94 (AC_Output_Handler *&svc_handler,
95 AC_Output_Handler *&sh_copy,
96 const ACE_SOCK_Connector::PEER_ADDR &remote_addr,
97 ACE_Time_Value *timeout,
98 const ACE_SOCK_Connector::PEER_ADDR &local_addr,
99 int reuse_addr, int flags, int perms);
101 // Pointer to <AC_Output_Handler> we're connecting.
102 AC_Output_Handler *handler_;
104 // Address at which logging server listens for connections.
105 ACE_INET_Addr remote_addr_;
107 // The SSL "context" data structure.
108 SSL_CTX *ssl_ctx_;
110 // The SSL data structure corresponding to authenticated SSL
111 // connections.
112 SSL *ssl_;
115 class AC_Client_Logging_Daemon : public ACE_Service_Object {
116 protected:
117 // Factory that passively connects the <AC_Input_Handler>.
118 AC_CLD_Acceptor acceptor_;
120 // Factory that actively connects the <AC_Output_Handler>.
121 AC_CLD_Connector connector_;
123 // The <AC_Output_Handler> connected by <AC_CLD_Connector>.
124 AC_Output_Handler output_handler_;
126 public:
127 // Constructor.
128 AC_Client_Logging_Daemon ()
129 : acceptor_ (&output_handler_),
130 connector_ (&output_handler_) {}
132 // Service Configurator hook methods.
133 virtual int init (int argc, ACE_TCHAR *argv[]);
134 virtual int fini ();
135 //virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const;
136 //virtual int suspend ();
137 //virtual int resume ();
140 /******************************************************/
142 #if !defined (FLUSH_TIMEOUT)
143 #define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */
144 #endif /* FLUSH_TIMEOUT */
146 int AC_Output_Handler::open (void *connector) {
147 connector_ =
148 static_cast<AC_CLD_Connector *> (connector);
149 int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
150 peer ().set_option (SOL_SOCKET, SO_SNDBUF,
151 &bufsiz, sizeof bufsiz);
152 if (reactor ()->register_handler
153 (this, ACE_Event_Handler::READ_MASK) == -1)
154 return -1;
155 if (msg_queue ()->activate ()
156 == ACE_Message_Queue_Base::ACTIVATED) {
157 msg_queue ()->high_water_mark (QUEUE_MAX);
158 return activate (THR_SCOPE_SYSTEM);
159 } else return 0;
162 int AC_Output_Handler::put (ACE_Message_Block *mb,
163 ACE_Time_Value *timeout) {
164 int retval;
165 while ((retval = putq (mb, timeout)) == -1) {
166 if (msg_queue ()->state () != ACE_Message_Queue_Base::PULSED)
167 break;
169 return retval;
172 int AC_Output_Handler::handle_input (ACE_HANDLE h) {
173 peer ().close ();
174 reactor ()->remove_handler
175 (h, ACE_Event_Handler::READ_MASK
176 | ACE_Event_Handler::DONT_CALL);
177 msg_queue ()->pulse ();
178 return 0;
181 int AC_Output_Handler::svc () {
182 ACE_Message_Block *chunk[ACE_IOV_MAX];
183 size_t message_index = 0;
184 ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ());
185 ACE_Time_Value timeout;
186 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
187 ACE_Sig_Action original_action;
188 no_sigpipe.register_action (SIGPIPE, &original_action);
190 for (;;) {
191 if (message_index == 0) {
192 timeout = ACE_OS::gettimeofday ();
193 timeout += FLUSH_TIMEOUT;
195 ACE_Message_Block *mblk = 0;
196 if (getq (mblk, &timeout) == -1) {
197 if (errno == ESHUTDOWN) {
198 if (connector_->reconnect () == -1) break;
199 continue;
200 } else if (errno != EWOULDBLOCK) break;
201 else if (message_index == 0) continue;
202 } else {
203 if (mblk->size () == 0
204 && mblk->msg_type () == ACE_Message_Block::MB_STOP)
205 { mblk->release (); break; }
206 chunk[message_index] = mblk;
207 ++message_index;
209 if (message_index >= ACE_IOV_MAX ||
210 (ACE_OS::gettimeofday () - time_of_last_send
211 >= ACE_Time_Value(FLUSH_TIMEOUT))) {
212 if (this->send (chunk, message_index) == -1) break;
213 time_of_last_send = ACE_OS::gettimeofday ();
217 if (message_index > 0)
218 this->send (chunk, message_index);
219 no_sigpipe.restore_action (SIGPIPE, original_action);
220 return 0;
223 int AC_Output_Handler::send (ACE_Message_Block *chunk[], size_t &count) {
224 iovec iov[ACE_IOV_MAX];
225 size_t iov_size;
226 int result = 0;
228 for (iov_size = 0; iov_size < count; ++iov_size) {
229 iov[iov_size].iov_base = chunk[iov_size]->rd_ptr ();
230 iov[iov_size].iov_len =
231 ACE_Utils::truncate_cast<u_long> (chunk[iov_size]->length ());
233 while (peer ().sendv_n (iov, ACE_Utils::truncate_cast<int> (iov_size)) == -1)
234 if (connector_->reconnect () == -1) {
235 result = -1;
236 break;
239 while (iov_size > 0) {
240 chunk[--iov_size]->release (); chunk[iov_size] = 0;
242 count = iov_size;
243 return result;
246 /******************************************************/
248 int AC_Input_Handler::open (void *) {
249 ACE_HANDLE handle = peer ().get_handle ();
250 if (reactor ()->register_handler
251 (handle, this, ACE_Event_Handler::READ_MASK) == -1)
252 return -1;
253 connected_clients_.set_bit (handle);
254 return 0;
257 int AC_Input_Handler::close (u_long) {
258 ACE_Message_Block *shutdown_message = 0;
259 ACE_NEW_RETURN
260 (shutdown_message,
261 ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1);
262 output_handler_->put (shutdown_message);
264 reactor ()->remove_handler
265 (connected_clients_, ACE_Event_Handler::READ_MASK);
266 return output_handler_->wait ();
269 int AC_Input_Handler::handle_input (ACE_HANDLE handle) {
270 ACE_Message_Block *mblk = 0;
271 Logging_Handler logging_handler (handle);
273 if (logging_handler.recv_log_record (mblk) != -1)
275 if (output_handler_->put (mblk->cont ()) != -1)
277 mblk->cont (0);
278 mblk->release ();
279 return 0; // Success return.
281 else
283 mblk->release ();
287 return -1; // Error return.
290 int AC_Input_Handler::handle_close (ACE_HANDLE handle,
291 ACE_Reactor_Mask) {
292 connected_clients_.clr_bit (handle);
293 return ACE_OS::closesocket (handle);
296 /********************************************************/
298 int AC_CLD_Acceptor::make_svc_handler (AC_Input_Handler *&sh)
299 { sh = &input_handler_; return 0; }
302 int AC_CLD_Acceptor::handle_close (ACE_HANDLE,
303 ACE_Reactor_Mask) {
304 PARENT::handle_close ();
305 input_handler_.close ();
306 return 0;
309 /*******************************************************/
311 #if !defined (CLD_CERTIFICATE_FILENAME)
312 # define CLD_CERTIFICATE_FILENAME "cld-cert.pem"
313 #endif /* !CLD_CERTIFICATE_FILENAME */
314 #if !defined (CLD_KEY_FILENAME)
315 # define CLD_KEY_FILENAME "cld-key.pem"
316 #endif /* !CLD_KEY_FILENAME */
318 int AC_CLD_Connector::open (ACE_Reactor *r, int flags) {
319 if (PARENT::open (r, flags) != 0) return -1;
320 OpenSSL_add_ssl_algorithms ();
321 ssl_ctx_ = SSL_CTX_new (SSLv23_client_method ());
322 if (ssl_ctx_ == 0) return -1;
324 if (SSL_CTX_use_certificate_file (ssl_ctx_,
325 CLD_CERTIFICATE_FILENAME,
326 SSL_FILETYPE_PEM) <= 0
327 || SSL_CTX_use_PrivateKey_file (ssl_ctx_,
328 CLD_KEY_FILENAME,
329 SSL_FILETYPE_PEM) <= 0
330 || !SSL_CTX_check_private_key (ssl_ctx_))
331 return -1;
333 ssl_ = SSL_new (ssl_ctx_);
334 if (ssl_ == 0) return -1;
335 return 0;
338 int AC_CLD_Connector::connect_svc_handler
339 (AC_Output_Handler *&svc_handler,
340 const ACE_SOCK_Connector::PEER_ADDR &remote_addr,
341 ACE_Time_Value *timeout,
342 const ACE_SOCK_Connector::PEER_ADDR &local_addr,
343 int reuse_addr, int flags, int perms) {
344 if (PARENT::connect_svc_handler
345 (svc_handler, remote_addr, timeout,
346 local_addr, reuse_addr, flags, perms) == -1) return -1;
347 SSL_clear (ssl_);
348 #if defined (ACE_WIN32)
349 // ACE_WIN32 is the only platform where ACE_HANDLE is not an int.
350 // See ace/config-lite.h for the typedefs.
351 SSL_set_fd (ssl_,
352 reinterpret_cast<int> (svc_handler->get_handle ()));
353 #else
354 SSL_set_fd (ssl_, svc_handler->get_handle ());
355 #endif /* ACE_WIN32 */
357 SSL_set_verify (ssl_, SSL_VERIFY_PEER, 0);
359 if (SSL_connect (ssl_) == -1
360 || SSL_shutdown (ssl_) == -1) return -1;
361 remote_addr_ = remote_addr;
362 return 0;
365 int AC_CLD_Connector::connect_svc_handler
366 (AC_Output_Handler *&svc_handler,
367 AC_Output_Handler *&sh_copy,
368 const ACE_SOCK_Connector::PEER_ADDR &remote_addr,
369 ACE_Time_Value *timeout,
370 const ACE_SOCK_Connector::PEER_ADDR &local_addr,
371 int reuse_addr, int flags, int perms) {
372 sh_copy = svc_handler;
373 return this->connect_svc_handler (svc_handler, remote_addr, timeout,
374 local_addr, reuse_addr, flags, perms);
377 int AC_CLD_Connector::reconnect () {
378 // Maximum number of times to retry connect.
379 const size_t MAX_RETRIES = 5;
380 ACE_Time_Value timeout (1);
381 size_t i;
382 for (i = 0; i < MAX_RETRIES; ++i) {
383 ACE_Synch_Options options (ACE_Synch_Options::USE_TIMEOUT,
384 timeout);
385 if (i > 0) ACE_OS::sleep (timeout);
386 if (this->connect (handler_, remote_addr_, options) == 0)
387 break;
388 timeout *= 2; // Exponential backoff.
390 return i == MAX_RETRIES ? -1 : 0;
393 /******************************************************/
395 int AC_Client_Logging_Daemon::init
396 (int argc, ACE_TCHAR *argv[]) {
397 u_short cld_port = ACE_DEFAULT_SERVICE_PORT;
398 u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT;
399 ACE_TCHAR sld_host[MAXHOSTNAMELEN];
400 ACE_OS::strcpy (sld_host, ACE_LOCALHOST);
402 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0);
403 get_opt.long_option (ACE_TEXT ("client_port"), 'p',
404 ACE_Get_Opt::ARG_REQUIRED);
405 get_opt.long_option (ACE_TEXT ("server_port"), 'r',
406 ACE_Get_Opt::ARG_REQUIRED);
407 get_opt.long_option (ACE_TEXT ("server_name"), 's',
408 ACE_Get_Opt::ARG_REQUIRED);
410 for (int c; (c = get_opt ()) != -1;)
411 switch (c) {
412 case 'p': // Client logging daemon acceptor port number.
413 cld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));
414 break;
415 case 'r': // Server logging daemon acceptor port number.
416 sld_port = static_cast<u_short> (ACE_OS::atoi (get_opt.opt_arg ()));
417 break;
418 case 's': // Server logging daemon hostname.
419 ACE_OS::strsncpy
420 (sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN);
421 break;
424 ACE_INET_Addr cld_addr (cld_port);
425 ACE_INET_Addr sld_addr (sld_port, sld_host);
427 if (acceptor_.open (cld_addr) == -1) return -1;
428 AC_Output_Handler *oh = &output_handler_;
429 if (connector_.connect (oh, sld_addr) == -1)
430 { acceptor_.close (); return -1; }
431 return 0;
434 int AC_Client_Logging_Daemon::fini ()
435 { return acceptor_.close (); }
437 ACE_FACTORY_DEFINE (AC_CLD, AC_Client_Logging_Daemon)