1 #include "ace/Get_Opt.h"
2 #include "ace/Acceptor.h"
3 #include "ace/SOCK_Connector.h"
4 #include "ace/SOCK_Acceptor.h"
5 #include "ace/SPIPE_Acceptor.h"
6 #include "ace/Log_Record.h"
7 #include "ace/OS_NS_stdio.h"
8 #include "ace/OS_NS_string.h"
9 #include "ace/OS_NS_sys_socket.h"
10 #include "ace/OS_NS_unistd.h"
11 #include "ace/CDR_Stream.h"
13 #include "ace/SString.h"
14 #include "ace/INET_Addr.h"
15 #include "Client_Logging_Handler.h"
17 ACE_Client_Logging_Handler::ACE_Client_Logging_Handler (ACE_HANDLE output_handle
)
18 : logging_output_ (output_handle
)
20 // Register ourselves to receive SIGPIPE so we can attempt
22 #if !defined (ACE_LACKS_UNIX_SIGNALS)
23 if (ACE_Reactor::instance ()->register_handler (SIGPIPE
,
26 ACE_TEXT ("%n: %p\n"),
27 ACE_TEXT ("register_handler (SIGPIPE)")));
28 #endif /* !ACE_LACKS_UNIX_SIGNALS */
31 // This is called when a <send> to the logging server fails...
33 ACE_Client_Logging_Handler::handle_signal (int signum
,
37 if (signum
== SIGPIPE
)
43 // This function is called every time a client connects to us.
45 ACE_Client_Logging_Handler::open (void *)
47 LOGGING_ADDR server_addr
;
49 // Register ourselves to receive <handle_input> callbacks when
50 // clients send us logging records. Note that since we're really a
51 // Singleton, this->peer() will change after each connect, so we
52 // need to grab the value now.
53 if (ACE_Reactor::instance ()->register_handler
54 (this->peer ().get_handle (),
56 ACE_Event_Handler::READ_MASK
57 | ACE_Event_Handler::EXCEPT_MASK
) == -1)
58 ACE_ERROR_RETURN ((LM_ERROR
,
59 ACE_TEXT ("%n: %p\n"),
60 ACE_TEXT ("register_handler")),
62 // Figure out what remote port we're really bound to.
63 if (this->peer ().get_remote_addr (server_addr
) == -1)
64 ACE_ERROR_RETURN ((LM_ERROR
,
66 ACE_TEXT ("get_remote_addr")),
69 ACE_TEXT ("Connected to client on handle %u\n"),
70 this->peer ().get_handle ()));
74 /* VIRTUAL */ ACE_HANDLE
75 ACE_Client_Logging_Handler::get_handle () const
77 ACE_TRACE ("ACE_Client_Logging_Handler::get_handle");
80 ACE_TEXT ("get_handle() shouldn't be called\n")));
82 return ACE_INVALID_HANDLE
;
85 // Receive a logging record from an application.
87 ACE_Client_Logging_Handler::handle_input (ACE_HANDLE handle
)
91 ACE_TEXT ("in ACE_Client_Logging_Handler::handle_input, handle = %u\n"),
95 if (handle
== this->logging_output_
)
96 // We're getting a message from the logging server!
97 ACE_ERROR_RETURN ((LM_ERROR
,
98 ACE_TEXT ("Received data from server!\n")),
100 ACE_Log_Record log_record
;
102 // We need to use the old two-read trick here since TCP sockets
103 // don't support framing natively. Allocate a message block for the
104 // payload; initially at least large enough to hold the header, but
105 // needs some room for alignment.
106 ACE_Message_Block
*payload_p
= 0;
107 ACE_Message_Block
*header_p
= 0;
108 ACE_NEW_RETURN (header_p
,
109 ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE
),
112 std::unique_ptr
<ACE_Message_Block
> header (header_p
);
114 // Align the Message Block for a CDR stream
115 ACE_CDR::mb_align (header
.get ());
117 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
118 // We're getting a logging message from a local application using
119 // STREAM pipes, which are nicely prioritized for us.
120 ACE_Str_Buf
header_msg (header
->wr_ptr (),
124 ACE_SPIPE_Stream spipe
;
125 spipe
.set_handle (handle
);
128 // We've got a framed IPC mechanism, so we can just to a <recv>.
129 ssize_t result
= spipe
.recv (&header_msg
,
133 if (result
< 0 || header_msg
.len
== 0)
135 ACE_DEBUG ((LM_DEBUG
,
136 ACE_TEXT ("client closing down unexpectedly\n")));
138 if (ACE_Reactor::instance ()->remove_handler
140 ACE_Event_Handler::READ_MASK
141 | ACE_Event_Handler::EXCEPT_MASK
142 | ACE_Event_Handler::DONT_CALL
) == -1)
143 ACE_ERROR_RETURN ((LM_ERROR
,
144 ACE_TEXT ("%n: %p\n"),
145 ACE_TEXT ("remove_handler")),
151 // We're getting a logging message from a local application using
152 // sockets pipes, which are NOT prioritized for us.
154 ssize_t
const count
= ACE::recv_n (handle
,
159 // Handle shutdown and error cases.
163 ACE_DEBUG ((LM_DEBUG
,
164 ACE_TEXT ("client closing down\n")));
166 if (ACE_Reactor::instance ()->remove_handler
168 ACE_Event_Handler::READ_MASK
169 | ACE_Event_Handler::EXCEPT_MASK
170 | ACE_Event_Handler::DONT_CALL
) == -1)
171 ACE_ERROR_RETURN ((LM_ERROR
,
172 ACE_TEXT ("%n: %p\n"),
173 ACE_TEXT ("remove_handler")),
175 if (handle
== this->peer ().get_handle ())
176 this->peer ().close ();
178 ACE_OS::closesocket (handle
);
179 // Release the memory to prevent a leak.
184 // Just fall through in this case..
187 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
189 // Reflect addition of 8 bytes for the header.
192 // Create a CDR stream to parse the 8-byte header.
193 ACE_InputCDR
header_cdr (header
.get ());
195 // Extract the byte-order and use helper methods to disambiguate
196 // octet, booleans, and chars.
197 ACE_CDR::Boolean byte_order
;
198 if (!(header_cdr
>> ACE_InputCDR::to_boolean (byte_order
)))
200 ACE_ERROR ((LM_ERROR
,
201 ACE_TEXT ("Can't extract byte_order\n")));
205 // Set the byte-order on the stream...
206 header_cdr
.reset_byte_order (byte_order
);
208 // Extract the length
209 ACE_CDR::ULong length
;
210 if (!(header_cdr
>> length
))
212 ACE_ERROR ((LM_ERROR
,
213 ACE_TEXT ("Can't extract length\n")));
217 ACE_NEW_RETURN (payload_p
,
218 ACE_Message_Block (length
),
220 std::unique_ptr
<ACE_Message_Block
> payload (payload_p
);
222 // Ensure there's sufficient room for log record payload.
223 ACE_CDR::grow (payload
.get (), 8 + ACE_CDR::MAX_ALIGNMENT
+ length
);
225 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
226 ACE_Str_Buf
payload_msg (payload
->wr_ptr (),
230 // We've got a framed IPC mechanism, so we can just do a <recv>.
231 result
= spipe
.recv ((ACE_Str_Buf
*) 0,
235 if (result
< 0 || payload_msg
.len
!= (int)length
)
237 ACE_DEBUG ((LM_DEBUG
,
239 ACE_TEXT ("client closing down due to error\n")));
241 if (ACE_Reactor::instance ()->remove_handler
243 ACE_Event_Handler::READ_MASK
244 | ACE_Event_Handler::EXCEPT_MASK
245 | ACE_Event_Handler::DONT_CALL
) == -1)
246 ACE_ERROR_RETURN ((LM_ERROR
,
247 ACE_TEXT ("%n: result %d, length %d %p\n"),
250 ACE_TEXT ("remove_handler")),
256 // Use <recv_n> to obtain the contents.
257 if (ACE::recv_n (handle
, payload
->wr_ptr (), length
) <= 0)
259 ACE_ERROR ((LM_ERROR
,
261 ACE_TEXT ("recv_n()")));
263 if (ACE_Reactor::instance ()->remove_handler
265 ACE_Event_Handler::READ_MASK
266 | ACE_Event_Handler::EXCEPT_MASK
267 | ACE_Event_Handler::DONT_CALL
) == -1)
268 ACE_ERROR ((LM_ERROR
,
269 ACE_TEXT ("%n: %p\n"),
270 ACE_TEXT ("remove_handler")));
272 ACE_OS::closesocket (handle
);
275 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
277 // Reflect additional bytes for the message.
278 payload
->wr_ptr (length
);
280 ACE_InputCDR
payload_cdr (payload
.get ());
281 payload_cdr
.reset_byte_order (byte_order
);
282 if (!(payload_cdr
>> log_record
)) // Finally extract the ACE_log_record.
284 ACE_ERROR ((LM_ERROR
,
285 ACE_TEXT ("Can't extract log_record\n")));
289 log_record
.length (length
);
291 // Forward the logging record to the server.
292 if (this->send (log_record
) == -1)
293 ACE_ERROR ((LM_ERROR
,
299 // Receive a logging record from an application send via a non-0
300 // MSG_BAND... This just calls handle_input().
302 ACE_Client_Logging_Handler::handle_exception (ACE_HANDLE handle
)
304 return this->handle_input (handle
);
307 // Called when object is removed from the ACE_Reactor
309 ACE_Client_Logging_Handler::close (u_long
)
311 if (this->logging_output_
!= ACE_STDERR
)
312 ACE_OS::closesocket (this->logging_output_
);
319 ACE_Client_Logging_Handler::handle_output (ACE_HANDLE
)
324 // Encodes the contents of log_record object using network byte-order
325 // and sends it to the logging server.
328 ACE_Client_Logging_Handler::send (ACE_Log_Record
&log_record
)
330 ostream
*orig_ostream
= ACE_Log_Msg::instance ()->msg_ostream ();
332 // This logic must occur before we do the encode() on <log_record>
333 // since otherwise the values of the <log_record> fields will be in
334 // network byte order.
337 log_record
.print (ACE_TEXT ("<localhost>"),
338 ACE_Log_Msg::instance ()->flags (),
341 if (this->logging_output_
== ACE_STDERR
)
343 log_record
.print (ACE_TEXT ("<localhost>"),
344 ACE_Log_Msg::instance ()->flags (),
349 // Serialize the log record using a CDR stream, allocate enough
350 // space for the complete <ACE_Log_Record>.
351 size_t const max_payload_size
=
356 + ACE_Log_Record::MAXLOGMSGLEN
// data
357 + ACE_CDR::MAX_ALIGNMENT
; // padding;
359 // Insert contents of <log_record> into payload stream.
360 ACE_OutputCDR
payload (max_payload_size
);
361 if (!(payload
<< log_record
))
363 ACE_ERROR ((LM_ERROR
,
364 ACE_TEXT ("Can't insert log_record\n")));
368 // Get the number of bytes used by the CDR stream.
369 ACE_CDR::ULong
const length
= payload
.total_length ();
371 // Send a header so the receiver can determine the byte order and
372 // size of the incoming CDR stream.
373 ACE_OutputCDR
header (ACE_CDR::MAX_ALIGNMENT
+ 8);
374 if (!(header
<< ACE_OutputCDR::from_boolean (ACE_CDR_BYTE_ORDER
)))
376 ACE_ERROR ((LM_ERROR
,
377 ACE_TEXT ("Can't insert byte order\n")));
381 // Store the size of the payload that follows
382 if (!(header
<< ACE_CDR::ULong (length
)))
384 ACE_ERROR ((LM_ERROR
,
385 ACE_TEXT ("Can't insert length\n")));
389 // Use an iovec to send both buffer and payload simultaneously.
391 iov
[0].iov_base
= header
.begin ()->rd_ptr ();
393 iov
[1].iov_base
= payload
.begin ()->rd_ptr ();
394 iov
[1].iov_len
= length
;
396 // We're running over sockets, so send header and payload
397 // efficiently using "gather-write".
398 if (ACE::sendv_n (this->logging_output_
,iov
, 2) == -1)
400 ACE_DEBUG ((LM_DEBUG
,
401 "Something about the sendv_n() failed, so switch to stderr\n"));
403 if (ACE_Log_Msg::instance ()->msg_ostream () == 0)
404 // Switch over to logging to stderr for now. At some
405 // point, we'll improve the implementation to queue up the
406 // message, try to reestablish a connection, and then send
407 // the queued data once we've reconnect to the logging
408 // server. If you'd like to implement this functionality
409 // and contribute it back to ACE that would be great!
410 this->logging_output_
= ACE_STDERR
;
413 ACE_DEBUG ((LM_DEBUG
,
414 "Sent logging message %s successfully to Server Logging Daemon!\n",
415 log_record
.priority_name (ACE_Log_Priority (log_record
.type ()))));
421 class ACE_Client_Logging_Acceptor
: public ACE_Acceptor
<ACE_Client_Logging_Handler
, LOGGING_ACCEPTOR
>
424 // This factory creates connections with the
425 // <Server_Logging_Acceptor>.
428 // This class contains the service-specific methods that can't
429 // easily be factored into the <ACE_Acceptor>.
431 ACE_Client_Logging_Acceptor ();
432 // Default constructor.
435 // = Dynamic linking hooks.
436 virtual int init (int argc
, ACE_TCHAR
*argv
[]);
437 // Called when service is linked.
440 // Called when service is unlinked.
442 virtual int info (ACE_TCHAR
**strp
, size_t length
) const;
443 // Called to determine info about the service.
445 virtual int make_svc_handler (ACE_Client_Logging_Handler
*&sh
);
446 // Factory that always returns the <handler_>.
448 // = Scheduling hooks.
449 virtual int suspend ();
450 virtual int resume ();
453 int parse_args (int argc
, ACE_TCHAR
*argv
[]);
454 // Parse svc.conf arguments.
456 const ACE_TCHAR
*server_host_
;
457 // Host where the logging server is located.
459 u_short server_port_
;
460 // Port number where the logging server is listening for
463 ACE_INET_Addr server_addr_
;
464 // Address to connect to the server logging daemon.
466 ACE_INET_Addr local_addr_
;
467 // Local IP/port number to use for the connection to the server logging
470 const ACE_TCHAR
*logger_key_
;
471 // Communication endpoint where the client logging daemon will
472 // listen for connections from clients.
474 ACE_Client_Logging_Handler
*handler_
;
475 // Pointer to the singleton handler that receives messages from
476 // clients and forwards to the server.
480 ACE_Client_Logging_Acceptor::fini ()
484 if (this->handler_
!= 0)
485 this->handler_
->close (0);
487 // Try to unlink the logger key so weird things don't happen if
488 // we're using STREAM pipes.
489 ACE_OS::unlink (this->logger_key_
);
491 // This memory was allocated by <ACE_OS::strdup>.
492 ACE_OS::free ((void *) this->logger_key_
);
494 ACE_OS::free ((void *) this->server_host_
);
500 ACE_Client_Logging_Acceptor::make_svc_handler (ACE_Client_Logging_Handler
*&sh
)
502 // Always return a pointer to the Singleton handler.
508 ACE_Client_Logging_Acceptor::info (ACE_TCHAR
**strp
, size_t length
) const
510 ACE_TCHAR buf
[BUFSIZ
];
512 ACE_OS::sprintf (buf
, ACE_TEXT ("%d/%s %s"),
513 this->server_addr_
.get_port_number (), "tcp",
514 "# client logging daemon\n");
516 if (*strp
== 0 && (*strp
= ACE_OS::strdup (buf
)) == 0)
519 ACE_OS::strncpy (*strp
, buf
, length
);
520 return ACE_OS::strlen (buf
);
523 ACE_Client_Logging_Acceptor::ACE_Client_Logging_Acceptor ()
524 : server_host_ (ACE_OS::strdup (ACE_DEFAULT_SERVER_HOST
)),
525 server_port_ (ACE_DEFAULT_LOGGING_SERVER_PORT
),
526 logger_key_ (ACE_OS::strdup (ACE_DEFAULT_LOGGER_KEY
)),
532 ACE_Client_Logging_Acceptor::init (int argc
, ACE_TCHAR
*argv
[])
534 // We'll log *our* error and debug messages to stderr!
535 if (ACE_LOG_MSG
->open (ACE_TEXT ("Client Logging Service")) == -1)
536 ACE_ERROR_RETURN ((LM_ERROR
,
537 ACE_TEXT ("Can't open ACE_Log_Msg\n")),
540 // Use the options hook to parse the command line arguments and set
542 this->parse_args (argc
, argv
);
544 // Try to unlink the logger key so weird things don't happen if
545 // we're using STREAM pipes.
546 ACE_OS::unlink (this->logger_key_
);
548 // Initialize the acceptor endpoint.
549 if (this->open (LOGGING_ADDR (this->logger_key_
)) == -1)
550 ACE_ERROR_RETURN ((LM_ERROR
,
555 // Establish connection with the server.
556 ACE_SOCK_Connector con
;
557 ACE_SOCK_Stream stream
;
558 ACE_INET_Addr server_addr
;
560 #if (ACE_HAS_STREAM_LOG_MSG_IPC == 1)
561 ACE_SPIPE_Addr lserver_addr
;
563 // Figure out what local port we're really bound to.
564 if (this->acceptor ().get_local_addr (lserver_addr
) == -1)
565 ACE_ERROR_RETURN ((LM_ERROR
,
567 ACE_TEXT ("get_local_addr")),
570 ACE_DEBUG ((LM_DEBUG
,
571 ACE_TEXT ("Starting up Client Logging Daemon, ")
572 ACE_TEXT ("bounded to STREAM addr %s on handle %u\n"),
573 lserver_addr
.get_path_name (),
574 this->acceptor ().get_handle ()));
576 ACE_INET_Addr lserver_addr
;
578 // Figure out what local port we're really bound to.
579 if (this->acceptor ().get_local_addr (lserver_addr
) == -1)
580 ACE_ERROR_RETURN ((LM_ERROR
,
582 ACE_TEXT ("get_local_addr")),
585 ACE_DEBUG ((LM_DEBUG
,
586 ACE_TEXT ("Starting up Client Logging Daemon, ")
587 ACE_TEXT ("bounded to local port %d on handle %u\n"),
588 lserver_addr
.get_port_number (),
589 this->acceptor ().get_handle ()));
590 #endif /* ACE_HAS_STREAM_LOG_MSG_IPC == 1 */
592 if (con
.connect (stream
,
595 this->local_addr_
) == -1)
597 ACE_ERROR ((LM_ERROR
,
598 ACE_TEXT ("Can't connect to logging server %C on port %d: ")
599 ACE_TEXT ("%m, using stderr\n"),
600 this->server_addr_
.get_host_name (),
601 this->server_addr_
.get_port_number (),
604 if (ACE_Log_Msg::instance ()->msg_ostream () == 0)
605 // If we can't connect to the server then we'll send the logging
606 // messages to stderr.
607 stream
.set_handle (ACE_STDERR
);
611 // Figure out what remote port we're really bound to.
612 if (stream
.get_remote_addr (server_addr
) == -1)
613 ACE_ERROR_RETURN ((LM_ERROR
,
615 ACE_TEXT ("get_remote_addr")),
617 ACE_DEBUG ((LM_DEBUG
,
618 ACE_TEXT ("Client Logging Daemon is connected to Server ")
619 ACE_TEXT ("Logging Daemon %C on port %d on handle %u\n"),
620 server_addr
.get_host_name (),
621 server_addr
.get_port_number (),
622 stream
.get_handle ()));
625 // Create the Singleton <Client_Logging_Handler>.
626 ACE_NEW_RETURN (this->handler_
,
627 ACE_Client_Logging_Handler (stream
.get_handle ()),
633 ACE_Client_Logging_Acceptor::parse_args (int argc
, ACE_TCHAR
*argv
[])
635 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("h:k:p:l:"), 0);
636 ACE_TString local_addr_str
;
638 for (int c
; (c
= get_opt ()) != -1; )
643 ACE_OS::free ((void *) this->server_host_
);
644 this->server_host_
= ACE_OS::strdup (get_opt
.opt_arg ());
647 ACE_OS::free ((void *) this->logger_key_
);
648 this->logger_key_
= ACE_OS::strdup (get_opt
.opt_arg ());
651 this->server_port_
= ACE_OS::atoi (get_opt
.opt_arg ());
654 local_addr_str
= get_opt
.opt_arg ();
657 ACE_ERROR_RETURN ((LM_ERROR
,
658 ACE_TEXT ("%n:\n[-p server-port]\n")
659 ACE_TEXT ("[-l local-ip[:local-port]]\n")),
664 this->local_addr_
.set ((u_short
)0); // "any"
665 if (local_addr_str
.length () > 0)
667 if (local_addr_str
.rfind (ACE_TCHAR(':')) == ACE_TString::npos
)
668 local_addr_str
+= ACE_TEXT (":0");
669 ACE_TCHAR
*local_addr_cstr
= local_addr_str
.rep ();
670 if (-1 == local_addr_
.string_to_addr (ACE_TEXT_ALWAYS_CHAR (local_addr_cstr
)))
671 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), local_addr_cstr
));
672 delete [] local_addr_cstr
;
675 if (this->server_addr_
.set (this->server_port_
,
676 this->server_host_
) == -1)
677 ACE_ERROR_RETURN ((LM_ERROR
,
685 ACE_Client_Logging_Acceptor::suspend ()
692 ACE_Client_Logging_Acceptor::resume ()
698 // The following is a "Factory" used by the ACE_Service_Config and
699 // svc.conf file to dynamically initialize the state of the
700 // single-threaded logging server.
702 ACE_SVC_FACTORY_DEFINE (ACE_Client_Logging_Acceptor
)