1 #include "ace/Get_Opt.h"
2 #include "ace/Acceptor.h"
3 #include "ace/SOCK_Connector.h"
4 #include "ace/SOCK_Acceptor.h"
5 #include "ace/SPIPE_Acceptor.h"
6 #include "ace/Log_Record.h"
7 #include "ace/OS_NS_stdio.h"
8 #include "ace/OS_NS_string.h"
9 #include "ace/OS_NS_sys_socket.h"
10 #include "ace/OS_NS_unistd.h"
11 #include "ace/CDR_Stream.h"
12 #include "ace/Auto_Ptr.h"
13 #include "ace/SString.h"
14 #include "ace/INET_Addr.h"
15 #include "Client_Logging_Handler.h"
17 ACE_Client_Logging_Handler::ACE_Client_Logging_Handler (ACE_HANDLE output_handle
)
18 : logging_output_ (output_handle
)
20 // Register ourselves to receive SIGPIPE so we can attempt
22 #if !defined (ACE_LACKS_UNIX_SIGNALS)
23 if (ACE_Reactor::instance ()->register_handler (SIGPIPE
,
26 ACE_TEXT ("%n: %p\n"),
27 ACE_TEXT ("register_handler (SIGPIPE)")));
28 #endif /* !ACE_LACKS_UNIX_SIGNALS */
31 // This is called when a <send> to the logging server fails...
33 ACE_Client_Logging_Handler::handle_signal (int signum
,
37 if (signum
== SIGPIPE
)
43 // This function is called every time a client connects to us.
45 ACE_Client_Logging_Handler::open (void *)
47 LOGGING_ADDR server_addr
;
49 // Register ourselves to receive <handle_input> callbacks when
50 // clients send us logging records. Note that since we're really a
51 // Singleton, this->peer() will change after each connect, so we
52 // need to grab the value now.
53 if (ACE_Reactor::instance ()->register_handler
54 (this->peer ().get_handle (),
56 ACE_Event_Handler::READ_MASK
57 | ACE_Event_Handler::EXCEPT_MASK
) == -1)
58 ACE_ERROR_RETURN ((LM_ERROR
,
59 ACE_TEXT ("%n: %p\n"),
60 ACE_TEXT ("register_handler")),
62 // Figure out what remote port we're really bound to.
63 if (this->peer ().get_remote_addr (server_addr
) == -1)
64 ACE_ERROR_RETURN ((LM_ERROR
,
66 ACE_TEXT ("get_remote_addr")),
69 ACE_TEXT ("Connected to client on handle %u\n"),
70 this->peer ().get_handle ()));
74 /* VIRTUAL */ ACE_HANDLE
75 ACE_Client_Logging_Handler::get_handle (void) const
77 ACE_TRACE ("ACE_Client_Logging_Handler::get_handle");
80 ACE_TEXT ("get_handle() shouldn't be called\n")));
82 return ACE_INVALID_HANDLE
;
85 // Receive a logging record from an application.
87 ACE_Client_Logging_Handler::handle_input (ACE_HANDLE handle
)
91 ACE_TEXT ("in ACE_Client_Logging_Handler::handle_input, handle = %u\n"),
95 if (handle
== this->logging_output_
)
96 // We're getting a message from the logging server!
97 ACE_ERROR_RETURN ((LM_ERROR
,
98 ACE_TEXT ("Received data from server!\n")),
100 ACE_Log_Record log_record
;
102 // We need to use the old two-read trick here since TCP sockets
103 // don't support framing natively. Allocate a message block for the
104 // payload; initially at least large enough to hold the header, but
105 // needs some room for alignment.
106 ACE_Message_Block
*payload_p
= 0;
107 ACE_Message_Block
*header_p
= 0;
108 ACE_NEW_RETURN (header_p
,
109 ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE
),
112 #if defined (ACE_HAS_CPP11)
113 std::unique_ptr
<ACE_Message_Block
> header (header_p
);
115 auto_ptr
<ACE_Message_Block
> header (header_p
);
116 #endif /* ACE_HAS_CPP11 */
118 // Align the Message Block for a CDR stream
119 ACE_CDR::mb_align (header
.get ());
121 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
122 // We're getting a logging message from a local application using
123 // STREAM pipes, which are nicely prioritized for us.
124 ACE_Str_Buf
header_msg (header
->wr_ptr (),
128 ACE_SPIPE_Stream spipe
;
129 spipe
.set_handle (handle
);
132 // We've got a framed IPC mechanism, so we can just to a <recv>.
133 ssize_t result
= spipe
.recv (&header_msg
,
137 if (result
< 0 || header_msg
.len
== 0)
139 ACE_DEBUG ((LM_DEBUG
,
140 ACE_TEXT ("client closing down unexpectedly\n")));
142 if (ACE_Reactor::instance ()->remove_handler
144 ACE_Event_Handler::READ_MASK
145 | ACE_Event_Handler::EXCEPT_MASK
146 | ACE_Event_Handler::DONT_CALL
) == -1)
147 ACE_ERROR_RETURN ((LM_ERROR
,
148 ACE_TEXT ("%n: %p\n"),
149 ACE_TEXT ("remove_handler")),
155 // We're getting a logging message from a local application using
156 // sockets pipes, which are NOT prioritized for us.
158 ssize_t
const count
= ACE::recv_n (handle
,
163 // Handle shutdown and error cases.
167 ACE_DEBUG ((LM_DEBUG
,
168 ACE_TEXT ("client closing down\n")));
170 if (ACE_Reactor::instance ()->remove_handler
172 ACE_Event_Handler::READ_MASK
173 | ACE_Event_Handler::EXCEPT_MASK
174 | ACE_Event_Handler::DONT_CALL
) == -1)
175 ACE_ERROR_RETURN ((LM_ERROR
,
176 ACE_TEXT ("%n: %p\n"),
177 ACE_TEXT ("remove_handler")),
179 if (handle
== this->peer ().get_handle ())
180 this->peer ().close ();
182 ACE_OS::closesocket (handle
);
183 // Release the memory to prevent a leak.
188 // Just fall through in this case..
191 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
193 // Reflect addition of 8 bytes for the header.
196 // Create a CDR stream to parse the 8-byte header.
197 ACE_InputCDR
header_cdr (header
.get ());
199 // Extract the byte-order and use helper methods to disambiguate
200 // octet, booleans, and chars.
201 ACE_CDR::Boolean byte_order
;
202 if (!(header_cdr
>> ACE_InputCDR::to_boolean (byte_order
)))
204 ACE_ERROR ((LM_ERROR
,
205 ACE_TEXT ("Can't extract byte_order\n")));
209 // Set the byte-order on the stream...
210 header_cdr
.reset_byte_order (byte_order
);
212 // Extract the length
213 ACE_CDR::ULong length
;
214 if (!(header_cdr
>> length
))
216 ACE_ERROR ((LM_ERROR
,
217 ACE_TEXT ("Can't extract length\n")));
221 ACE_NEW_RETURN (payload_p
,
222 ACE_Message_Block (length
),
224 #if defined (ACE_HAS_CPP11)
225 std::unique_ptr
<ACE_Message_Block
> payload (payload_p
);
227 auto_ptr
<ACE_Message_Block
> payload (payload_p
);
228 #endif /* ACE_HAS_CPP11 */
230 // Ensure there's sufficient room for log record payload.
231 ACE_CDR::grow (payload
.get (), 8 + ACE_CDR::MAX_ALIGNMENT
+ length
);
233 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
234 ACE_Str_Buf
payload_msg (payload
->wr_ptr (),
238 // We've got a framed IPC mechanism, so we can just do a <recv>.
239 result
= spipe
.recv ((ACE_Str_Buf
*) 0,
243 if (result
< 0 || payload_msg
.len
!= (int)length
)
245 ACE_DEBUG ((LM_DEBUG
,
247 ACE_TEXT ("client closing down due to error\n")));
249 if (ACE_Reactor::instance ()->remove_handler
251 ACE_Event_Handler::READ_MASK
252 | ACE_Event_Handler::EXCEPT_MASK
253 | ACE_Event_Handler::DONT_CALL
) == -1)
254 ACE_ERROR_RETURN ((LM_ERROR
,
255 ACE_TEXT ("%n: result %d, length %d %p\n"),
258 ACE_TEXT ("remove_handler")),
264 // Use <recv_n> to obtain the contents.
265 if (ACE::recv_n (handle
, payload
->wr_ptr (), length
) <= 0)
267 ACE_ERROR ((LM_ERROR
,
269 ACE_TEXT ("recv_n()")));
271 if (ACE_Reactor::instance ()->remove_handler
273 ACE_Event_Handler::READ_MASK
274 | ACE_Event_Handler::EXCEPT_MASK
275 | ACE_Event_Handler::DONT_CALL
) == -1)
276 ACE_ERROR ((LM_ERROR
,
277 ACE_TEXT ("%n: %p\n"),
278 ACE_TEXT ("remove_handler")));
280 ACE_OS::closesocket (handle
);
283 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
285 // Reflect additional bytes for the message.
286 payload
->wr_ptr (length
);
288 ACE_InputCDR
payload_cdr (payload
.get ());
289 payload_cdr
.reset_byte_order (byte_order
);
290 if (!(payload_cdr
>> log_record
)) // Finally extract the ACE_log_record.
292 ACE_ERROR ((LM_ERROR
,
293 ACE_TEXT ("Can't extract log_record\n")));
297 log_record
.length (length
);
299 // Forward the logging record to the server.
300 if (this->send (log_record
) == -1)
301 ACE_ERROR ((LM_ERROR
,
307 // Receive a logging record from an application send via a non-0
308 // MSG_BAND... This just calls handle_input().
310 ACE_Client_Logging_Handler::handle_exception (ACE_HANDLE handle
)
312 return this->handle_input (handle
);
315 // Called when object is removed from the ACE_Reactor
317 ACE_Client_Logging_Handler::close (u_long
)
319 if (this->logging_output_
!= ACE_STDERR
)
320 ACE_OS::closesocket (this->logging_output_
);
327 ACE_Client_Logging_Handler::handle_output (ACE_HANDLE
)
332 // Encodes the contents of log_record object using network byte-order
333 // and sends it to the logging server.
336 ACE_Client_Logging_Handler::send (ACE_Log_Record
&log_record
)
338 ostream
*orig_ostream
= ACE_Log_Msg::instance ()->msg_ostream ();
340 // This logic must occur before we do the encode() on <log_record>
341 // since otherwise the values of the <log_record> fields will be in
342 // network byte order.
345 log_record
.print (ACE_TEXT ("<localhost>"),
346 ACE_Log_Msg::instance ()->flags (),
349 if (this->logging_output_
== ACE_STDERR
)
351 log_record
.print (ACE_TEXT ("<localhost>"),
352 ACE_Log_Msg::instance ()->flags (),
357 // Serialize the log record using a CDR stream, allocate enough
358 // space for the complete <ACE_Log_Record>.
359 size_t const max_payload_size
=
364 + ACE_Log_Record::MAXLOGMSGLEN
// data
365 + ACE_CDR::MAX_ALIGNMENT
; // padding;
367 // Insert contents of <log_record> into payload stream.
368 ACE_OutputCDR
payload (max_payload_size
);
369 if (!(payload
<< log_record
))
371 ACE_ERROR ((LM_ERROR
,
372 ACE_TEXT ("Can't insert log_record\n")));
376 // Get the number of bytes used by the CDR stream.
377 ACE_CDR::ULong
const length
= payload
.total_length ();
379 // Send a header so the receiver can determine the byte order and
380 // size of the incoming CDR stream.
381 ACE_OutputCDR
header (ACE_CDR::MAX_ALIGNMENT
+ 8);
382 if (!(header
<< ACE_OutputCDR::from_boolean (ACE_CDR_BYTE_ORDER
)))
384 ACE_ERROR ((LM_ERROR
,
385 ACE_TEXT ("Can't insert byte order\n")));
389 // Store the size of the payload that follows
390 if (!(header
<< ACE_CDR::ULong (length
)))
392 ACE_ERROR ((LM_ERROR
,
393 ACE_TEXT ("Can't insert length\n")));
397 // Use an iovec to send both buffer and payload simultaneously.
399 iov
[0].iov_base
= header
.begin ()->rd_ptr ();
401 iov
[1].iov_base
= payload
.begin ()->rd_ptr ();
402 iov
[1].iov_len
= length
;
404 // We're running over sockets, so send header and payload
405 // efficiently using "gather-write".
406 if (ACE::sendv_n (this->logging_output_
,iov
, 2) == -1)
408 ACE_DEBUG ((LM_DEBUG
,
409 "Something about the sendv_n() failed, so switch to stderr\n"));
411 if (ACE_Log_Msg::instance ()->msg_ostream () == 0)
412 // Switch over to logging to stderr for now. At some
413 // point, we'll improve the implementation to queue up the
414 // message, try to reestablish a connection, and then send
415 // the queued data once we've reconnect to the logging
416 // server. If you'd like to implement this functionality
417 // and contribute it back to ACE that would be great!
418 this->logging_output_
= ACE_STDERR
;
421 ACE_DEBUG ((LM_DEBUG
,
422 "Sent logging message %s successfully to Server Logging Daemon!\n",
423 log_record
.priority_name (ACE_Log_Priority (log_record
.type ()))));
429 class ACE_Client_Logging_Acceptor
: public ACE_Acceptor
<ACE_Client_Logging_Handler
, LOGGING_ACCEPTOR
>
432 // This factory creates connections with the
433 // <Server_Logging_Acceptor>.
436 // This class contains the service-specific methods that can't
437 // easily be factored into the <ACE_Acceptor>.
439 ACE_Client_Logging_Acceptor (void);
440 // Default constructor.
443 // = Dynamic linking hooks.
444 virtual int init (int argc
, ACE_TCHAR
*argv
[]);
445 // Called when service is linked.
447 virtual int fini (void);
448 // Called when service is unlinked.
450 virtual int info (ACE_TCHAR
**strp
, size_t length
) const;
451 // Called to determine info about the service.
453 virtual int make_svc_handler (ACE_Client_Logging_Handler
*&sh
);
454 // Factory that always returns the <handler_>.
456 // = Scheduling hooks.
457 virtual int suspend (void);
458 virtual int resume (void);
461 int parse_args (int argc
, ACE_TCHAR
*argv
[]);
462 // Parse svc.conf arguments.
464 const ACE_TCHAR
*server_host_
;
465 // Host where the logging server is located.
467 u_short server_port_
;
468 // Port number where the logging server is listening for
471 ACE_INET_Addr server_addr_
;
472 // Address to connect to the server logging daemon.
474 ACE_INET_Addr local_addr_
;
475 // Local IP/port number to use for the connection to the server logging
478 const ACE_TCHAR
*logger_key_
;
479 // Communication endpoint where the client logging daemon will
480 // listen for connections from clients.
482 ACE_Client_Logging_Handler
*handler_
;
483 // Pointer to the singleton handler that receives messages from
484 // clients and forwards to the server.
488 ACE_Client_Logging_Acceptor::fini (void)
492 if (this->handler_
!= 0)
493 this->handler_
->close (0);
495 // Try to unlink the logger key so weird things don't happen if
496 // we're using STREAM pipes.
497 ACE_OS::unlink (this->logger_key_
);
499 // This memory was allocated by <ACE_OS::strdup>.
500 ACE_OS::free ((void *) this->logger_key_
);
502 ACE_OS::free ((void *) this->server_host_
);
508 ACE_Client_Logging_Acceptor::make_svc_handler (ACE_Client_Logging_Handler
*&sh
)
510 // Always return a pointer to the Singleton handler.
516 ACE_Client_Logging_Acceptor::info (ACE_TCHAR
**strp
, size_t length
) const
518 ACE_TCHAR buf
[BUFSIZ
];
520 ACE_OS::sprintf (buf
, ACE_TEXT ("%d/%s %s"),
521 this->server_addr_
.get_port_number (), "tcp",
522 "# client logging daemon\n");
524 if (*strp
== 0 && (*strp
= ACE_OS::strdup (buf
)) == 0)
527 ACE_OS::strncpy (*strp
, buf
, length
);
528 return ACE_OS::strlen (buf
);
531 ACE_Client_Logging_Acceptor::ACE_Client_Logging_Acceptor (void)
532 : server_host_ (ACE_OS::strdup (ACE_DEFAULT_SERVER_HOST
)),
533 server_port_ (ACE_DEFAULT_LOGGING_SERVER_PORT
),
534 logger_key_ (ACE_OS::strdup (ACE_DEFAULT_LOGGER_KEY
)),
540 ACE_Client_Logging_Acceptor::init (int argc
, ACE_TCHAR
*argv
[])
542 // We'll log *our* error and debug messages to stderr!
543 if (ACE_LOG_MSG
->open (ACE_TEXT ("Client Logging Service")) == -1)
544 ACE_ERROR_RETURN ((LM_ERROR
,
545 ACE_TEXT ("Can't open ACE_Log_Msg\n")),
548 // Use the options hook to parse the command line arguments and set
550 this->parse_args (argc
, argv
);
552 // Try to unlink the logger key so weird things don't happen if
553 // we're using STREAM pipes.
554 ACE_OS::unlink (this->logger_key_
);
556 // Initialize the acceptor endpoint.
557 if (this->open (LOGGING_ADDR (this->logger_key_
)) == -1)
558 ACE_ERROR_RETURN ((LM_ERROR
,
563 // Establish connection with the server.
564 ACE_SOCK_Connector con
;
565 ACE_SOCK_Stream stream
;
566 ACE_INET_Addr server_addr
;
568 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
569 ACE_SPIPE_Addr lserver_addr
;
571 // Figure out what local port we're really bound to.
572 if (this->acceptor ().get_local_addr (lserver_addr
) == -1)
573 ACE_ERROR_RETURN ((LM_ERROR
,
575 ACE_TEXT ("get_local_addr")),
578 ACE_DEBUG ((LM_DEBUG
,
579 ACE_TEXT ("Starting up Client Logging Daemon, ")
580 ACE_TEXT ("bounded to STREAM addr %s on handle %u\n"),
581 lserver_addr
.get_path_name (),
582 this->acceptor ().get_handle ()));
584 ACE_INET_Addr lserver_addr
;
586 // Figure out what local port we're really bound to.
587 if (this->acceptor ().get_local_addr (lserver_addr
) == -1)
588 ACE_ERROR_RETURN ((LM_ERROR
,
590 ACE_TEXT ("get_local_addr")),
593 ACE_DEBUG ((LM_DEBUG
,
594 ACE_TEXT ("Starting up Client Logging Daemon, ")
595 ACE_TEXT ("bounded to local port %d on handle %u\n"),
596 lserver_addr
.get_port_number (),
597 this->acceptor ().get_handle ()));
598 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
600 if (con
.connect (stream
,
603 this->local_addr_
) == -1)
605 ACE_ERROR ((LM_ERROR
,
606 ACE_TEXT ("Can't connect to logging server %C on port %d: ")
607 ACE_TEXT ("%m, using stderr\n"),
608 this->server_addr_
.get_host_name (),
609 this->server_addr_
.get_port_number (),
612 if (ACE_Log_Msg::instance ()->msg_ostream () == 0)
613 // If we can't connect to the server then we'll send the logging
614 // messages to stderr.
615 stream
.set_handle (ACE_STDERR
);
619 // Figure out what remote port we're really bound to.
620 if (stream
.get_remote_addr (server_addr
) == -1)
621 ACE_ERROR_RETURN ((LM_ERROR
,
623 ACE_TEXT ("get_remote_addr")),
625 ACE_DEBUG ((LM_DEBUG
,
626 ACE_TEXT ("Client Logging Daemon is connected to Server ")
627 ACE_TEXT ("Logging Daemon %C on port %d on handle %u\n"),
628 server_addr
.get_host_name (),
629 server_addr
.get_port_number (),
630 stream
.get_handle ()));
633 // Create the Singleton <Client_Logging_Handler>.
634 ACE_NEW_RETURN (this->handler_
,
635 ACE_Client_Logging_Handler (stream
.get_handle ()),
641 ACE_Client_Logging_Acceptor::parse_args (int argc
, ACE_TCHAR
*argv
[])
643 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("h:k:p:l:"), 0);
644 ACE_TString local_addr_str
;
646 for (int c
; (c
= get_opt ()) != -1; )
651 ACE_OS::free ((void *) this->server_host_
);
652 this->server_host_
= ACE_OS::strdup (get_opt
.opt_arg ());
655 ACE_OS::free ((void *) this->logger_key_
);
656 this->logger_key_
= ACE_OS::strdup (get_opt
.opt_arg ());
659 this->server_port_
= ACE_OS::atoi (get_opt
.opt_arg ());
662 local_addr_str
= get_opt
.opt_arg ();
665 ACE_ERROR_RETURN ((LM_ERROR
,
666 ACE_TEXT ("%n:\n[-p server-port]\n")
667 ACE_TEXT ("[-l local-ip[:local-port]]\n")),
672 this->local_addr_
.set ((u_short
)0); // "any"
673 if (local_addr_str
.length () > 0)
675 if (local_addr_str
.rfind (ACE_TCHAR(':')) == ACE_TString::npos
)
676 local_addr_str
+= ACE_TEXT (":0");
677 ACE_TCHAR
*local_addr_cstr
= local_addr_str
.rep ();
678 if (-1 == local_addr_
.string_to_addr (ACE_TEXT_ALWAYS_CHAR (local_addr_cstr
)))
679 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), local_addr_cstr
));
680 delete [] local_addr_cstr
;
683 if (this->server_addr_
.set (this->server_port_
,
684 this->server_host_
) == -1)
685 ACE_ERROR_RETURN ((LM_ERROR
,
693 ACE_Client_Logging_Acceptor::suspend (void)
700 ACE_Client_Logging_Acceptor::resume (void)
706 // The following is a "Factory" used by the ACE_Service_Config and
707 // svc.conf file to dynamically initialize the state of the
708 // single-threaded logging server.
710 ACE_SVC_FACTORY_DEFINE (ACE_Client_Logging_Acceptor
)