1 #include "ace/Get_Opt.h"
2 #include "TS_Clerk_Handler.h"
3 #include "ace/Lib_Find.h"
4 #include "ace/Signal.h"
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/OS_NS_string.h"
7 #include "ace/OS_NS_time.h"
8 #include "ace/os_include/os_netdb.h"
10 ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor
*processor
,
12 : state_ (ACE_TS_Clerk_Handler::IDLE
),
13 timeout_ (ACE_DEFAULT_TIMEOUT
),
14 max_timeout_ (ACE_TS_Clerk_Handler::MAX_RETRY_TIMEOUT
),
16 processor_ (processor
)
18 ACE_TRACE ("ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler");
19 this->time_info_
.delta_time_
= 0;
20 this->time_info_
.sequence_num_
= 0;
23 // Set the connection state
25 ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state
)
27 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
31 // Get the connection state
32 ACE_TS_Clerk_Handler::State
33 ACE_TS_Clerk_Handler::state (void)
35 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
39 // Sets the timeout delay.
41 ACE_TS_Clerk_Handler::timeout (long to
)
43 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
44 if (to
> this->max_timeout_
)
45 to
= this->max_timeout_
;
50 // Recalculate the current retry timeout delay using exponential
51 // backoff. Returns the original timeout (i.e., before the
54 ACE_TS_Clerk_Handler::timeout (void)
56 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
57 long old_timeout
= this->timeout_
;
60 if (this->timeout_
> this->max_timeout_
)
61 this->timeout_
= this->max_timeout_
;
66 // This is called when a <send> to the logging server fails...
69 ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t
*, ucontext_t
*)
74 // Set the max timeout delay.
76 ACE_TS_Clerk_Handler::max_timeout (long mto
)
78 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
79 this->max_timeout_
= mto
;
82 // Gets the max timeout delay.
84 ACE_TS_Clerk_Handler::max_timeout (void)
86 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
87 return this->max_timeout_
;
91 ACE_TS_Clerk_Handler::open (void *)
93 ACE_TRACE ("ACE_TS_Clerk_Handler::open");
94 ACE_INET_Addr server_addr
;
96 // Set connection state as established
97 this->state (ACE_TS_Clerk_Handler::ESTABLISHED
);
99 // Register ourselves to receive SIGPIPE so we can attempt
101 #if !defined (ACE_WIN32)
102 if (ACE_Reactor::instance ()->register_handler (SIGPIPE
, this) == -1)
103 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%n: %p\n"),
104 ACE_TEXT ("register_handler (SIGPIPE)")), -1);
105 #endif /* ACE_WIN32 */
107 // Register ourselves with the reactor to receive input
108 if (ACE_Reactor::instance ()->register_handler (this->get_handle (),
110 ACE_Event_Handler::READ_MASK
|
111 ACE_Event_Handler::EXCEPT_MASK
) == -1)
112 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%n: %p\n"),
113 ACE_TEXT ("register_handler (this)")));
115 // Figure out what remote port we're really bound to.
116 else if (this->peer ().get_remote_addr (server_addr
) == -1)
117 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
118 ACE_TEXT ("get_remote_addr")),
121 ACE_DEBUG ((LM_DEBUG
,
122 ACE_TEXT ("TS Clerk Daemon connected to port %d on handle %d\n"),
123 server_addr
.get_port_number (),
124 this->peer ().get_handle ()));
130 ACE_TS_Clerk_Handler::get_handle (void) const
132 ACE_TRACE ("ACE_TS_Clerk_Handler::get_handle");
133 return this->peer().get_handle ();
137 ACE_TS_Clerk_Handler::handle_close (ACE_HANDLE
,
138 ACE_Reactor_Mask mask
)
140 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_close");
141 ACE_UNUSED_ARG (mask
);
143 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) shutting down on handle %d\n"),
144 this->get_handle ()));
146 return this->reinitiate_connection ();
150 ACE_TS_Clerk_Handler::reinitiate_connection (void)
152 ACE_TRACE ("ACE_TS_Clerk_Handler::reinitiate_connection");
153 // Skip over deactivated descriptors.
155 // Set state to connecting so that we don't try to send anything
156 // using this handler
157 this->state (ACE_TS_Clerk_Handler::CONNECTING
);
158 if (this->get_handle () != ACE_INVALID_HANDLE
)
160 ACE_DEBUG ((LM_DEBUG
,
161 ACE_TEXT ("(%t) Scheduling reinitiation of connection\n")));
163 // Reschedule ourselves to try and connect again.
164 ACE_Time_Value
const timeout (this->timeout ());
165 if (ACE_Reactor::instance ()->schedule_timer (this, 0,
167 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
168 ACE_TEXT ("schedule_timer")), -1);
173 // Receive a time update from a server
175 ACE_TS_Clerk_Handler::handle_input (ACE_HANDLE
)
177 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_input");
178 // We're getting a time update message from a server
179 ACE_Time_Request reply
;
180 if (this->recv_reply (reply
) != 0)
184 // Get current local time
185 time_t local_time
= ACE_OS::time (0);
187 // Compure delta time (difference between current local time and
188 // system time obtained from the server)
189 time_t t
= reply
.time () - local_time
;
191 // Compute round trip delay and adjust time accordingly
192 time_t one_way_time
= (local_time
- this->start_time_
)/2;
195 // Now update time info (to be retrieved by Clerk_Processor)
196 this->time_info_
.delta_time_
= t
;
197 this->time_info_
.sequence_num_
= this->cur_sequence_num_
;
202 // Restart connection asynchronously when timeout occurs.
204 ACE_TS_Clerk_Handler::handle_timeout (const ACE_Time_Value
&,
207 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_timeout");
208 ACE_DEBUG ((LM_DEBUG
,
209 ACE_TEXT ("(%t) attempting to reconnect to server with timeout = %d\n"),
212 // Close down peer to reclaim descriptor if need be. Note this is
213 // necessary to reconnect.
214 this->peer ().close ();
216 return this->processor_
->initiate_connection (this, ACE_Synch_Options::asynch
);
220 ACE_TS_Clerk_Handler::remote_addr (ACE_INET_Addr
&addr
)
222 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
223 this->remote_addr_
= addr
;
227 ACE_TS_Clerk_Handler::remote_addr (void)
229 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
230 return this->remote_addr_
;
234 ACE_TS_Clerk_Handler::recv_reply (ACE_Time_Request
&reply
)
236 ACE_TRACE ("ACE_TS_Clerk_Handler::recv_reply");
237 const int bytes_expected
= reply
.size ();
239 // Since Time_Request messages are fixed size, read the entire
240 // message in one go.
241 ssize_t n
= this->peer ().recv ((void *) &reply
, bytes_expected
);
243 if (n
!= bytes_expected
)
248 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("****************** recv_reply returned -1\n")));
251 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p got %d bytes, expected %d bytes\n"),
252 ACE_TEXT ("recv failed"), n
, bytes_expected
));
255 // We've shutdown unexpectedly
260 else if (reply
.decode () == -1) // Decode the request into host byte order.
261 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
262 ACE_TEXT ("decode failed")), -1);
268 ACE_TS_Clerk_Handler::send_request (ACE_UINT32 sequence_num
, ACE_Time_Info
&time_info
)
270 ACE_TRACE ("ACE_TS_Clerk_Handler::send_request");
274 // Update current sequence number
275 this->cur_sequence_num_
= sequence_num
;
277 // First update the current time info.
278 time_info
.delta_time_
= this->time_info_
.delta_time_
;
279 time_info
.sequence_num_
= this->time_info_
.sequence_num_
;
281 // Now prepare a new time update request
282 ACE_Time_Request
request (ACE_Time_Request::TIME_UPDATE
, 0, 0);
284 if ((length
= request
.encode (buffer
)) == -1)
285 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
286 ACE_TEXT ("encode failed")), -1);
288 // Compute start time of sending request (needed to compute
290 this->start_time_
= ACE_OS::time (0);
293 if (this->peer ().send_n (buffer
, length
) != length
)
294 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
295 ACE_TEXT ("send_n failed")),
301 ACE_TS_Clerk_Processor::ACE_TS_Clerk_Processor ()
302 : timeout_ (ACE_DEFAULT_TIMEOUT
),
303 blocking_semantics_ (0),
304 cur_sequence_num_ (0)
306 #if defined (ACE_DEFAULT_BACKING_STORE)
307 // Create a temporary file.
308 ACE_OS::strcpy (this->poolname_
,
309 ACE_DEFAULT_BACKING_STORE
);
310 #else /* ACE_DEFAULT_BACKING_STORE */
311 if (ACE::get_temp_dir (this->poolname_
,
312 MAXPATHLEN
- 17) == -1) // -17 for ace-malloc-XXXXXX
314 ACE_ERROR ((LM_ERROR
,
315 ACE_TEXT ("Temporary path too long, ")
316 ACE_TEXT ("defaulting to current directory\n")));
317 this->poolname_
[0] = 0;
320 // Add the filename to the end
321 ACE_OS::strcat (this->poolname_
, ACE_TEXT ("ace-malloc-XXXXXX"));
323 #endif /* ACE_DEFAULT_BACKING_STORE */
327 ACE_TS_Clerk_Processor::alloc (void)
329 ACE_TRACE ("ACE_TS_Clerk_Processor::alloc");
330 ACE_NEW (this->shmem_
, ALLOCATOR (this->poolname_
));
334 // Only create the state if it doesn't already exist.
335 if (this->shmem_
->find (ACE_DEFAULT_TIME_SERVER_STR
, temp
) == -1)
337 // Allocate the space out of shared memory for the system time entry
338 temp
= (this->shmem_
->malloc (2 * sizeof (time_t)));
340 // Give it a name binding
341 this->shmem_
->bind (ACE_DEFAULT_TIME_SERVER_STR
, temp
);
344 // Set up pointers. Note that we add one to get to the second
345 // field in the structure
346 time_t *time_p
= (time_t *)temp
;
347 this->system_time_
.delta_time_
= time_p
;
348 this->system_time_
.last_local_time_
= time_p
+ 1;
351 *(this->system_time_
.delta_time_
) = 0;
352 *(this->system_time_
.last_local_time_
) = ACE_OS::time (0);
355 // Query the servers for the latest time
357 ACE_TS_Clerk_Processor::handle_timeout (const ACE_Time_Value
&,
360 ACE_TRACE ("ACE_TS_Clerk_Processor::handle_timeout");
361 return this->update_time ();
365 ACE_TS_Clerk_Processor::update_time ()
367 ACE_TRACE ("ACE_TS_Clerk_Processor::update_time");
368 ACE_UINT32 expected_sequence_num
= this->cur_sequence_num_
;
370 // Increment sequence number
371 this->cur_sequence_num_
++;
374 time_t total_delta
= 0;
375 ACE_Time_Info time_info
;
377 // Call send_request() on all handlers
378 ACE_TS_Clerk_Handler
**handler
= 0;
380 for (HANDLER_SET_ITERATOR
set_iterator (this->handler_set_
);
381 set_iterator
.next (handler
) != 0;
382 set_iterator
.advance ())
384 if ((*handler
)->state () == ACE_TS_Clerk_Handler::ESTABLISHED
)
386 if ((*handler
)->send_request (this->cur_sequence_num_
, time_info
) == -1)
388 // Check if sequence numbers match; otherwise discard
389 else if (expected_sequence_num
!= 0 &&
390 time_info
.sequence_num_
== expected_sequence_num
)
393 ACE_DEBUG ((LM_DEBUG
,
394 ACE_TEXT ("[%d] Delta time: %d\n"),
395 count
, time_info
.delta_time_
));
397 // #### Can check here if delta value falls within a threshold ####
398 total_delta
+= time_info
.delta_time_
;
402 // Update system_time_ using average of times obtained from all the servers.
403 // Note that we are keeping two things in shared memory: the delta
404 // time (difference between our system clock and the local clock),
405 // and the last local time
408 // At least one server is out there
409 *(this->system_time_
.delta_time_
) = total_delta
/count
;
413 // No servers are out there (or this is the first time around
414 // computing the time) so set delta time to zero. This
415 // would mean that clients would use the actual local system time.
416 *(this->system_time_
.delta_time_
) = 0;
418 // Update the last local time
419 *(this->system_time_
.last_local_time_
) = ACE_OS::time (0);
421 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Average delta time: %d\n"),
422 (int)(*(this->system_time_
.delta_time_
))));
428 ACE_TS_Clerk_Processor::fini (void)
430 ACE_TRACE ("ACE_TS_Clerk_Processor::fini");
433 if (this->timer_id_
!= -1)
434 ACE_Reactor::instance ()->cancel_timer (this->timer_id_
);
436 // Destroy all the handlers
437 ACE_TS_Clerk_Handler
**handler
= 0;
439 for (HANDLER_SET_ITERATOR
set_iterator (this->handler_set_
);
440 set_iterator
.next (handler
) != 0;
441 set_iterator
.advance ())
443 if ((*handler
)->state () != ACE_TS_Clerk_Handler::IDLE
)
444 // Mark state as DISCONNECTING so we don't try to reconnect...
445 (*handler
)->state (ACE_TS_Clerk_Handler::DISCONNECTING
);
447 // Deallocate resources.
448 (*handler
)->destroy (); // Will trigger a delete
451 // Remove the backing store
452 this->shmem_
->remove ();
454 ACE_Connector
<ACE_TS_Clerk_Handler
, ACE_SOCK_CONNECTOR
>::fini ();
460 ACE_TS_Clerk_Processor::info (ACE_TCHAR
**, size_t) const
462 ACE_TRACE ("ACE_TS_Clerk_Processor::info");
467 ACE_TS_Clerk_Processor::init (int argc
, ACE_TCHAR
*argv
[])
469 ACE_TRACE ("ACE_TS_Clerk_Processor::init");
470 // Use the options hook to parse the command line arguments and set
472 this->parse_args (argc
, argv
);
476 #if !defined (ACE_WIN32)
477 // Ignore SIPPIPE so each Output_Channel can handle it.
478 ACE_Sig_Action
sig ((ACE_SignalHandler
) SIG_IGN
, SIGPIPE
);
479 ACE_UNUSED_ARG (sig
);
480 #endif /* ACE_WIN32 */
482 ACE_Synch_Options
&synch_options
= this->blocking_semantics_
== 0
483 ? ACE_Synch_Options::asynch
: ACE_Synch_Options::synch
;
485 // Now set up connections to all servers
486 ACE_TS_Clerk_Handler
**handler
= 0;
488 for (HANDLER_SET_ITERATOR
set_iterator (this->handler_set_
);
489 set_iterator
.next (handler
) != 0;
490 set_iterator
.advance ())
492 this->initiate_connection (*handler
, synch_options
);
494 // Now set up timer to receive updates from server
495 // set the timer to go off after timeout value
496 this->timer_id_
= ACE_Reactor::instance ()->schedule_timer (this,
498 ACE_Time_Value (this->timeout_
),
499 ACE_Time_Value (this->timeout_
));
504 ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler
*handler
,
505 ACE_Synch_Options
&synch_options
)
507 ACE_TRACE ("ACE_TS_Clerk_Processor::initiate_connection");
508 ACE_TCHAR buf
[MAXHOSTNAMELEN
+ 1];
510 // Mark ourselves as idle so that the various iterators will ignore
511 // us until we are connected/reconnected.
512 handler
->state (ACE_TS_Clerk_Handler::IDLE
);
514 if (handler
->remote_addr ().addr_to_string (buf
, MAXHOSTNAMELEN
) == -1)
515 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
516 ACE_TEXT ("can't obtain peer's address")), -1);
518 // Establish connection with the server.
519 if (this->connect (handler
,
520 handler
->remote_addr (),
521 synch_options
) == -1)
523 if (errno
!= EWOULDBLOCK
)
525 handler
->state (ACE_TS_Clerk_Handler::FAILED
);
526 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) %p on address %s\n"),
527 ACE_TEXT ("connect"), buf
));
529 // Reschedule ourselves to try and connect again.
530 if (synch_options
[ACE_Synch_Options::USE_REACTOR
])
532 ACE_Time_Value
const handler_timeout (handler
->timeout ());
533 if (ACE_Reactor::instance ()->schedule_timer (handler
,
535 handler_timeout
) == -1)
536 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
537 ACE_TEXT ("schedule_timer")), -1);
540 // Failures on synchronous connects are reported as errors
541 // so that the caller can decide how to proceed.
546 handler
->state (ACE_TS_Clerk_Handler::CONNECTING
);
547 ACE_DEBUG ((LM_DEBUG
,
548 ACE_TEXT ("(%t) in the process of connecting %s to %s\n"),
549 synch_options
[ACE_Synch_Options::USE_REACTOR
] ?
550 ACE_TEXT ("asynchronously") : ACE_TEXT ("synchronously"),
556 handler
->state (ACE_TS_Clerk_Handler::ESTABLISHED
);
557 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) connected to %s on %d\n"),
558 buf
, handler
->get_handle ()));
564 ACE_TS_Clerk_Processor::parse_args (int argc
, ACE_TCHAR
*argv
[])
566 ACE_TRACE ("ACE_TS_Clerk_Processor::parse_args");
567 ACE_INET_Addr server_addr
;
568 ACE_TS_Clerk_Handler
*handler
;
570 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("h:t:p:b"), 0);
572 for (int c
; (c
= get_opt ()) != -1; )
577 // Get the hostname:port and create an ADDR
578 server_addr
.set (get_opt
.opt_arg ());
580 // Create a new handler
581 ACE_NEW_RETURN (handler
,
582 ACE_TS_Clerk_Handler (this, server_addr
),
586 this->handler_set_
.insert (handler
);
589 // Get the timeout value
590 this->timeout_
= ACE_OS::atoi (get_opt
.opt_arg ());
594 ACE_OS::strncpy (this->poolname_
,
596 sizeof this->poolname_
/ sizeof (ACE_TCHAR
));
599 // Blocking semantics
600 this->blocking_semantics_
= 1;
603 ACE_ERROR_RETURN ((LM_ERROR
,
604 ACE_TEXT ("%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n")),
612 ACE_TS_Clerk_Processor::suspend (void)
614 ACE_TRACE ("ACE_TS_Clerk_Processor::suspend");
619 ACE_TS_Clerk_Processor::resume (void)
621 ACE_TRACE ("ACE_TS_Clerk_Processor::resume");
625 // The following is a "Factory" used by the ACE_Service_Config and
626 // svc.conf file to dynamically initialize the state of the TS_Clerk.
628 ACE_SVC_FACTORY_DEFINE (ACE_TS_Clerk_Processor
)