Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / netsvcs / lib / Client_Logging_Handler.cpp
blob9ee40d609e40bc00a42da5b6e0a19e1727c6d8a4
1 #include "ace/Get_Opt.h"
2 #include "ace/Acceptor.h"
3 #include "ace/SOCK_Connector.h"
4 #include "ace/SOCK_Acceptor.h"
5 #include "ace/SPIPE_Acceptor.h"
6 #include "ace/Log_Record.h"
7 #include "ace/OS_NS_stdio.h"
8 #include "ace/OS_NS_string.h"
9 #include "ace/OS_NS_sys_socket.h"
10 #include "ace/OS_NS_unistd.h"
11 #include "ace/CDR_Stream.h"
12 #include <memory>
13 #include "ace/SString.h"
14 #include "ace/INET_Addr.h"
15 #include "Client_Logging_Handler.h"
17 ACE_Client_Logging_Handler::ACE_Client_Logging_Handler (ACE_HANDLE output_handle)
18 : logging_output_ (output_handle)
20 // Register ourselves to receive SIGPIPE so we can attempt
21 // reconnections.
22 #if !defined (ACE_LACKS_UNIX_SIGNALS)
23 if (ACE_Reactor::instance ()->register_handler (SIGPIPE,
24 this) == -1)
25 ACE_ERROR ((LM_ERROR,
26 ACE_TEXT ("%n: %p\n"),
27 ACE_TEXT ("register_handler (SIGPIPE)")));
28 #endif /* !ACE_LACKS_UNIX_SIGNALS */
31 // This is called when a <send> to the logging server fails...
32 int
33 ACE_Client_Logging_Handler::handle_signal (int signum,
34 siginfo_t *,
35 ucontext_t *)
37 if (signum == SIGPIPE)
38 return 0;
39 else
40 return -1;
43 // This function is called every time a client connects to us.
44 int
45 ACE_Client_Logging_Handler::open (void *)
47 LOGGING_ADDR server_addr;
49 // Register ourselves to receive <handle_input> callbacks when
50 // clients send us logging records. Note that since we're really a
51 // Singleton, this->peer() will change after each connect, so we
52 // need to grab the value now.
53 if (ACE_Reactor::instance ()->register_handler
54 (this->peer ().get_handle (),
55 this,
56 ACE_Event_Handler::READ_MASK
57 | ACE_Event_Handler::EXCEPT_MASK) == -1)
58 ACE_ERROR_RETURN ((LM_ERROR,
59 ACE_TEXT ("%n: %p\n"),
60 ACE_TEXT ("register_handler")),
61 -1);
62 // Figure out what remote port we're really bound to.
63 if (this->peer ().get_remote_addr (server_addr) == -1)
64 ACE_ERROR_RETURN ((LM_ERROR,
65 ACE_TEXT ("%p\n"),
66 ACE_TEXT ("get_remote_addr")),
67 -1);
68 ACE_DEBUG ((LM_DEBUG,
69 ACE_TEXT ("Connected to client on handle %u\n"),
70 this->peer ().get_handle ()));
71 return 0;
74 /* VIRTUAL */ ACE_HANDLE
75 ACE_Client_Logging_Handler::get_handle () const
77 ACE_TRACE ("ACE_Client_Logging_Handler::get_handle");
79 ACE_ERROR ((LM_ERROR,
80 ACE_TEXT ("get_handle() shouldn't be called\n")));
82 return ACE_INVALID_HANDLE;
85 // Receive a logging record from an application.
86 int
87 ACE_Client_Logging_Handler::handle_input (ACE_HANDLE handle)
89 #if 0
90 ACE_DEBUG ((LM_DEBUG,
91 ACE_TEXT ("in ACE_Client_Logging_Handler::handle_input, handle = %u\n"),
92 handle));
93 #endif /* 0 */
95 if (handle == this->logging_output_)
96 // We're getting a message from the logging server!
97 ACE_ERROR_RETURN ((LM_ERROR,
98 ACE_TEXT ("Received data from server!\n")),
99 -1);
100 ACE_Log_Record log_record;
102 // We need to use the old two-read trick here since TCP sockets
103 // don't support framing natively. Allocate a message block for the
104 // payload; initially at least large enough to hold the header, but
105 // needs some room for alignment.
106 ACE_Message_Block *payload_p = 0;
107 ACE_Message_Block *header_p = 0;
108 ACE_NEW_RETURN (header_p,
109 ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE),
110 -1);
112 std::unique_ptr <ACE_Message_Block> header (header_p);
114 // Align the Message Block for a CDR stream
115 ACE_CDR::mb_align (header.get ());
117 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
118 // We're getting a logging message from a local application using
119 // STREAM pipes, which are nicely prioritized for us.
120 ACE_Str_Buf header_msg (header->wr_ptr (),
124 ACE_SPIPE_Stream spipe;
125 spipe.set_handle (handle);
126 int flags = 0;
128 // We've got a framed IPC mechanism, so we can just to a <recv>.
129 ssize_t result = spipe.recv (&header_msg,
130 (ACE_Str_Buf *) 0,
131 &flags);
133 if (result < 0 || header_msg.len == 0)
135 ACE_DEBUG ((LM_DEBUG,
136 ACE_TEXT ("client closing down unexpectedly\n")));
138 if (ACE_Reactor::instance ()->remove_handler
139 (handle,
140 ACE_Event_Handler::READ_MASK
141 | ACE_Event_Handler::EXCEPT_MASK
142 | ACE_Event_Handler::DONT_CALL) == -1)
143 ACE_ERROR_RETURN ((LM_ERROR,
144 ACE_TEXT ("%n: %p\n"),
145 ACE_TEXT ("remove_handler")),
146 -1);
147 spipe.close ();
148 return 0;
150 #else
151 // We're getting a logging message from a local application using
152 // sockets pipes, which are NOT prioritized for us.
154 ssize_t const count = ACE::recv_n (handle,
155 header->wr_ptr (),
157 switch (count)
159 // Handle shutdown and error cases.
160 default:
161 case -1:
162 case 0:
163 ACE_DEBUG ((LM_DEBUG,
164 ACE_TEXT ("client closing down\n")));
166 if (ACE_Reactor::instance ()->remove_handler
167 (handle,
168 ACE_Event_Handler::READ_MASK
169 | ACE_Event_Handler::EXCEPT_MASK
170 | ACE_Event_Handler::DONT_CALL) == -1)
171 ACE_ERROR_RETURN ((LM_ERROR,
172 ACE_TEXT ("%n: %p\n"),
173 ACE_TEXT ("remove_handler")),
175 if (handle == this->peer ().get_handle ())
176 this->peer ().close ();
177 else
178 ACE_OS::closesocket (handle);
179 // Release the memory to prevent a leak.
180 return 0;
181 /* NOTREACHED */
183 case 8:
184 // Just fall through in this case..
185 break;
187 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
189 // Reflect addition of 8 bytes for the header.
190 header->wr_ptr (8);
192 // Create a CDR stream to parse the 8-byte header.
193 ACE_InputCDR header_cdr (header.get ());
195 // Extract the byte-order and use helper methods to disambiguate
196 // octet, booleans, and chars.
197 ACE_CDR::Boolean byte_order;
198 if (!(header_cdr >> ACE_InputCDR::to_boolean (byte_order)))
200 ACE_ERROR ((LM_ERROR,
201 ACE_TEXT ("Can't extract byte_order\n")));
202 return 0;
205 // Set the byte-order on the stream...
206 header_cdr.reset_byte_order (byte_order);
208 // Extract the length
209 ACE_CDR::ULong length;
210 if (!(header_cdr >> length))
212 ACE_ERROR ((LM_ERROR,
213 ACE_TEXT ("Can't extract length\n")));
214 return 0;
217 ACE_NEW_RETURN (payload_p,
218 ACE_Message_Block (length),
219 -1);
220 std::unique_ptr <ACE_Message_Block> payload (payload_p);
222 // Ensure there's sufficient room for log record payload.
223 ACE_CDR::grow (payload.get (), 8 + ACE_CDR::MAX_ALIGNMENT + length);
225 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
226 ACE_Str_Buf payload_msg (payload->wr_ptr (),
228 length);
230 // We've got a framed IPC mechanism, so we can just do a <recv>.
231 result = spipe.recv ((ACE_Str_Buf *) 0,
232 &payload_msg,
233 &flags);
235 if (result < 0 || payload_msg.len != (int)length)
237 ACE_DEBUG ((LM_DEBUG,
238 ACE_TEXT ("%p\n"),
239 ACE_TEXT ("client closing down due to error\n")));
241 if (ACE_Reactor::instance ()->remove_handler
242 (handle,
243 ACE_Event_Handler::READ_MASK
244 | ACE_Event_Handler::EXCEPT_MASK
245 | ACE_Event_Handler::DONT_CALL) == -1)
246 ACE_ERROR_RETURN ((LM_ERROR,
247 ACE_TEXT ("%n: result %d, length %d %p\n"),
248 result,
249 payload_msg.len,
250 ACE_TEXT ("remove_handler")),
251 -1);
252 spipe.close ();
253 return 0;
255 #else
256 // Use <recv_n> to obtain the contents.
257 if (ACE::recv_n (handle, payload->wr_ptr (), length) <= 0)
259 ACE_ERROR ((LM_ERROR,
260 ACE_TEXT ("%p\n"),
261 ACE_TEXT ("recv_n()")));
263 if (ACE_Reactor::instance ()->remove_handler
264 (handle,
265 ACE_Event_Handler::READ_MASK
266 | ACE_Event_Handler::EXCEPT_MASK
267 | ACE_Event_Handler::DONT_CALL) == -1)
268 ACE_ERROR ((LM_ERROR,
269 ACE_TEXT ("%n: %p\n"),
270 ACE_TEXT ("remove_handler")));
272 ACE_OS::closesocket (handle);
273 return 0;
275 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
277 // Reflect additional bytes for the message.
278 payload->wr_ptr (length);
280 ACE_InputCDR payload_cdr (payload.get ());
281 payload_cdr.reset_byte_order (byte_order);
282 if (!(payload_cdr >> log_record)) // Finally extract the ACE_log_record.
284 ACE_ERROR ((LM_ERROR,
285 ACE_TEXT ("Can't extract log_record\n")));
286 return 0;
289 log_record.length (length);
291 // Forward the logging record to the server.
292 if (this->send (log_record) == -1)
293 ACE_ERROR ((LM_ERROR,
294 ACE_TEXT ("%p\n"),
295 ACE_TEXT ("send")));
296 return 0;
299 // Receive a logging record from an application send via a non-0
300 // MSG_BAND... This just calls handle_input().
302 ACE_Client_Logging_Handler::handle_exception (ACE_HANDLE handle)
304 return this->handle_input (handle);
307 // Called when object is removed from the ACE_Reactor
309 ACE_Client_Logging_Handler::close (u_long)
311 if (this->logging_output_ != ACE_STDERR)
312 ACE_OS::closesocket (this->logging_output_);
314 this->destroy ();
315 return 0;
319 ACE_Client_Logging_Handler::handle_output (ACE_HANDLE)
321 return 0;
324 // Encodes the contents of log_record object using network byte-order
325 // and sends it to the logging server.
328 ACE_Client_Logging_Handler::send (ACE_Log_Record &log_record)
330 ostream *orig_ostream = ACE_Log_Msg::instance ()->msg_ostream ();
332 // This logic must occur before we do the encode() on <log_record>
333 // since otherwise the values of the <log_record> fields will be in
334 // network byte order.
336 if (orig_ostream)
337 log_record.print (ACE_TEXT ("<localhost>"),
338 ACE_Log_Msg::instance ()->flags (),
339 *orig_ostream);
341 if (this->logging_output_ == ACE_STDERR)
343 log_record.print (ACE_TEXT ("<localhost>"),
344 ACE_Log_Msg::instance ()->flags (),
345 stderr);
347 else
349 // Serialize the log record using a CDR stream, allocate enough
350 // space for the complete <ACE_Log_Record>.
351 size_t const max_payload_size =
352 4 // type()
353 + 8 // timestamp
354 + 4 // process id
355 + 4 // data length
356 + ACE_Log_Record::MAXLOGMSGLEN // data
357 + ACE_CDR::MAX_ALIGNMENT; // padding;
359 // Insert contents of <log_record> into payload stream.
360 ACE_OutputCDR payload (max_payload_size);
361 if (!(payload << log_record))
363 ACE_ERROR ((LM_ERROR,
364 ACE_TEXT ("Can't insert log_record\n")));
365 return -1;
368 // Get the number of bytes used by the CDR stream.
369 ACE_CDR::ULong const length = payload.total_length ();
371 // Send a header so the receiver can determine the byte order and
372 // size of the incoming CDR stream.
373 ACE_OutputCDR header (ACE_CDR::MAX_ALIGNMENT + 8);
374 if (!(header << ACE_OutputCDR::from_boolean (ACE_CDR_BYTE_ORDER)))
376 ACE_ERROR ((LM_ERROR,
377 ACE_TEXT ("Can't insert byte order\n")));
378 return -1;
381 // Store the size of the payload that follows
382 if (!(header << ACE_CDR::ULong (length)))
384 ACE_ERROR ((LM_ERROR,
385 ACE_TEXT ("Can't insert length\n")));
386 return -1;
389 // Use an iovec to send both buffer and payload simultaneously.
390 iovec iov[2];
391 iov[0].iov_base = header.begin ()->rd_ptr ();
392 iov[0].iov_len = 8;
393 iov[1].iov_base = payload.begin ()->rd_ptr ();
394 iov[1].iov_len = length;
396 // We're running over sockets, so send header and payload
397 // efficiently using "gather-write".
398 if (ACE::sendv_n (this->logging_output_,iov, 2) == -1)
400 ACE_DEBUG ((LM_DEBUG,
401 "Something about the sendv_n() failed, so switch to stderr\n"));
403 if (ACE_Log_Msg::instance ()->msg_ostream () == 0)
404 // Switch over to logging to stderr for now. At some
405 // point, we'll improve the implementation to queue up the
406 // message, try to reestablish a connection, and then send
407 // the queued data once we've reconnect to the logging
408 // server. If you'd like to implement this functionality
409 // and contribute it back to ACE that would be great!
410 this->logging_output_ = ACE_STDERR;
412 else
413 ACE_DEBUG ((LM_DEBUG,
414 "Sent logging message %s successfully to Server Logging Daemon!\n",
415 log_record.priority_name (ACE_Log_Priority (log_record.type ()))));
418 return 0;
421 class ACE_Client_Logging_Acceptor : public ACE_Acceptor<ACE_Client_Logging_Handler, LOGGING_ACCEPTOR>
423 // = TITLE
424 // This factory creates connections with the
425 // <Server_Logging_Acceptor>.
427 // = DESCRIPTION
428 // This class contains the service-specific methods that can't
429 // easily be factored into the <ACE_Acceptor>.
430 public:
431 ACE_Client_Logging_Acceptor ();
432 // Default constructor.
434 protected:
435 // = Dynamic linking hooks.
436 virtual int init (int argc, ACE_TCHAR *argv[]);
437 // Called when service is linked.
439 virtual int fini ();
440 // Called when service is unlinked.
442 virtual int info (ACE_TCHAR **strp, size_t length) const;
443 // Called to determine info about the service.
445 virtual int make_svc_handler (ACE_Client_Logging_Handler *&sh);
446 // Factory that always returns the <handler_>.
448 // = Scheduling hooks.
449 virtual int suspend ();
450 virtual int resume ();
452 private:
453 int parse_args (int argc, ACE_TCHAR *argv[]);
454 // Parse svc.conf arguments.
456 const ACE_TCHAR *server_host_;
457 // Host where the logging server is located.
459 u_short server_port_;
460 // Port number where the logging server is listening for
461 // connections.
463 ACE_INET_Addr server_addr_;
464 // Address to connect to the server logging daemon.
466 ACE_INET_Addr local_addr_;
467 // Local IP/port number to use for the connection to the server logging
468 // daemon.
470 const ACE_TCHAR *logger_key_;
471 // Communication endpoint where the client logging daemon will
472 // listen for connections from clients.
474 ACE_Client_Logging_Handler *handler_;
475 // Pointer to the singleton handler that receives messages from
476 // clients and forwards to the server.
480 ACE_Client_Logging_Acceptor::fini ()
482 this->close ();
484 if (this->handler_ != 0)
485 this->handler_->close (0);
487 // Try to unlink the logger key so weird things don't happen if
488 // we're using STREAM pipes.
489 ACE_OS::unlink (this->logger_key_);
491 // This memory was allocated by <ACE_OS::strdup>.
492 ACE_OS::free ((void *) this->logger_key_);
494 ACE_OS::free ((void *) this->server_host_);
496 return 0;
500 ACE_Client_Logging_Acceptor::make_svc_handler (ACE_Client_Logging_Handler *&sh)
502 // Always return a pointer to the Singleton handler.
503 sh = this->handler_;
504 return 0;
508 ACE_Client_Logging_Acceptor::info (ACE_TCHAR **strp, size_t length) const
510 ACE_TCHAR buf[BUFSIZ];
512 ACE_OS::sprintf (buf, ACE_TEXT ("%d/%s %s"),
513 this->server_addr_.get_port_number (), "tcp",
514 "# client logging daemon\n");
516 if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
517 return -1;
518 else
519 ACE_OS::strncpy (*strp, buf, length);
520 return ACE_OS::strlen (buf);
523 ACE_Client_Logging_Acceptor::ACE_Client_Logging_Acceptor ()
524 : server_host_ (ACE_OS::strdup (ACE_DEFAULT_SERVER_HOST)),
525 server_port_ (ACE_DEFAULT_LOGGING_SERVER_PORT),
526 logger_key_ (ACE_OS::strdup (ACE_DEFAULT_LOGGER_KEY)),
527 handler_ (0)
532 ACE_Client_Logging_Acceptor::init (int argc, ACE_TCHAR *argv[])
534 // We'll log *our* error and debug messages to stderr!
535 if (ACE_LOG_MSG->open (ACE_TEXT ("Client Logging Service")) == -1)
536 ACE_ERROR_RETURN ((LM_ERROR,
537 ACE_TEXT ("Can't open ACE_Log_Msg\n")),
538 -1);
540 // Use the options hook to parse the command line arguments and set
541 // options.
542 this->parse_args (argc, argv);
544 // Try to unlink the logger key so weird things don't happen if
545 // we're using STREAM pipes.
546 ACE_OS::unlink (this->logger_key_);
548 // Initialize the acceptor endpoint.
549 if (this->open (LOGGING_ADDR (this->logger_key_)) == -1)
550 ACE_ERROR_RETURN ((LM_ERROR,
551 ACE_TEXT ("%p\n"),
552 this->logger_key_),
553 -1);
555 // Establish connection with the server.
556 ACE_SOCK_Connector con;
557 ACE_SOCK_Stream stream;
558 ACE_INET_Addr server_addr;
560 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
561 ACE_SPIPE_Addr lserver_addr;
563 // Figure out what local port we're really bound to.
564 if (this->acceptor ().get_local_addr (lserver_addr) == -1)
565 ACE_ERROR_RETURN ((LM_ERROR,
566 ACE_TEXT ("%p\n"),
567 ACE_TEXT ("get_local_addr")),
568 -1);
570 ACE_DEBUG ((LM_DEBUG,
571 ACE_TEXT ("Starting up Client Logging Daemon, ")
572 ACE_TEXT ("bounded to STREAM addr %s on handle %u\n"),
573 lserver_addr.get_path_name (),
574 this->acceptor ().get_handle ()));
575 #else
576 ACE_INET_Addr lserver_addr;
578 // Figure out what local port we're really bound to.
579 if (this->acceptor ().get_local_addr (lserver_addr) == -1)
580 ACE_ERROR_RETURN ((LM_ERROR,
581 ACE_TEXT ("%p\n"),
582 ACE_TEXT ("get_local_addr")),
583 -1);
585 ACE_DEBUG ((LM_DEBUG,
586 ACE_TEXT ("Starting up Client Logging Daemon, ")
587 ACE_TEXT ("bounded to local port %d on handle %u\n"),
588 lserver_addr.get_port_number (),
589 this->acceptor ().get_handle ()));
590 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
592 if (con.connect (stream,
593 this->server_addr_,
595 this->local_addr_) == -1)
597 ACE_ERROR ((LM_ERROR,
598 ACE_TEXT ("Can't connect to logging server %C on port %d: ")
599 ACE_TEXT ("%m, using stderr\n"),
600 this->server_addr_.get_host_name (),
601 this->server_addr_.get_port_number (),
602 ACE_ERRNO_GET));
604 if (ACE_Log_Msg::instance ()->msg_ostream () == 0)
605 // If we can't connect to the server then we'll send the logging
606 // messages to stderr.
607 stream.set_handle (ACE_STDERR);
609 else
611 // Figure out what remote port we're really bound to.
612 if (stream.get_remote_addr (server_addr) == -1)
613 ACE_ERROR_RETURN ((LM_ERROR,
614 ACE_TEXT ("%p\n"),
615 ACE_TEXT ("get_remote_addr")),
616 -1);
617 ACE_DEBUG ((LM_DEBUG,
618 ACE_TEXT ("Client Logging Daemon is connected to Server ")
619 ACE_TEXT ("Logging Daemon %C on port %d on handle %u\n"),
620 server_addr.get_host_name (),
621 server_addr.get_port_number (),
622 stream.get_handle ()));
625 // Create the Singleton <Client_Logging_Handler>.
626 ACE_NEW_RETURN (this->handler_,
627 ACE_Client_Logging_Handler (stream.get_handle ()),
628 -1);
629 return 0;
633 ACE_Client_Logging_Acceptor::parse_args (int argc, ACE_TCHAR *argv[])
635 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:k:p:l:"), 0);
636 ACE_TString local_addr_str;
638 for (int c; (c = get_opt ()) != -1; )
640 switch (c)
642 case 'h':
643 ACE_OS::free ((void *) this->server_host_);
644 this->server_host_ = ACE_OS::strdup (get_opt.opt_arg ());
645 break;
646 case 'k':
647 ACE_OS::free ((void *) this->logger_key_);
648 this->logger_key_ = ACE_OS::strdup (get_opt.opt_arg ());
649 break;
650 case 'p':
651 this->server_port_ = ACE_OS::atoi (get_opt.opt_arg ());
652 break;
653 case 'l':
654 local_addr_str = get_opt.opt_arg ();
655 break;
656 default:
657 ACE_ERROR_RETURN ((LM_ERROR,
658 ACE_TEXT ("%n:\n[-p server-port]\n")
659 ACE_TEXT ("[-l local-ip[:local-port]]\n")),
660 -1);
664 this->local_addr_.set ((u_short)0); // "any"
665 if (local_addr_str.length () > 0)
667 if (local_addr_str.rfind (ACE_TCHAR(':')) == ACE_TString::npos)
668 local_addr_str += ACE_TEXT (":0");
669 ACE_TCHAR *local_addr_cstr = local_addr_str.rep ();
670 if (-1 == local_addr_.string_to_addr (ACE_TEXT_ALWAYS_CHAR (local_addr_cstr)))
671 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), local_addr_cstr));
672 delete [] local_addr_cstr;
675 if (this->server_addr_.set (this->server_port_,
676 this->server_host_) == -1)
677 ACE_ERROR_RETURN ((LM_ERROR,
678 ACE_TEXT ("%p\n"),
679 this->server_host_),
680 -1);
681 return 0;
685 ACE_Client_Logging_Acceptor::suspend ()
687 // To be done...
688 return 0;
692 ACE_Client_Logging_Acceptor::resume ()
694 // To be done...
695 return 0;
698 // The following is a "Factory" used by the ACE_Service_Config and
699 // svc.conf file to dynamically initialize the state of the
700 // single-threaded logging server.
702 ACE_SVC_FACTORY_DEFINE (ACE_Client_Logging_Acceptor)