1 #include "ace/Get_Opt.h"
2 #include "TS_Clerk_Handler.h"
3 #include "ace/Lib_Find.h"
4 #include "ace/Signal.h"
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/OS_NS_string.h"
7 #include "ace/OS_NS_time.h"
8 #include "ace/os_include/os_netdb.h"
10 ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor
*processor
,
12 : state_ (ACE_TS_Clerk_Handler::IDLE
),
13 timeout_ (ACE_DEFAULT_TIMEOUT
),
14 max_timeout_ (ACE_TS_Clerk_Handler::MAX_RETRY_TIMEOUT
),
16 processor_ (processor
)
18 ACE_TRACE ("ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler");
19 this->time_info_
.delta_time_
= 0;
20 this->time_info_
.sequence_num_
= 0;
23 // Set the connection state
25 ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state
)
27 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
31 // Get the connection state
32 ACE_TS_Clerk_Handler::State
33 ACE_TS_Clerk_Handler::state ()
35 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
39 // Sets the timeout delay.
41 ACE_TS_Clerk_Handler::timeout (long to
)
43 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
44 if (to
> this->max_timeout_
)
45 to
= this->max_timeout_
;
50 // Recalculate the current retry timeout delay using exponential
51 // backoff. Returns the original timeout (i.e., before the
54 ACE_TS_Clerk_Handler::timeout ()
56 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
57 long old_timeout
= this->timeout_
;
60 if (this->timeout_
> this->max_timeout_
)
61 this->timeout_
= this->max_timeout_
;
66 // This is called when a <send> to the logging server fails...
68 ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t
*, ucontext_t
*)
73 // Set the max timeout delay.
75 ACE_TS_Clerk_Handler::max_timeout (long mto
)
77 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
78 this->max_timeout_
= mto
;
81 // Gets the max timeout delay.
83 ACE_TS_Clerk_Handler::max_timeout ()
85 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
86 return this->max_timeout_
;
90 ACE_TS_Clerk_Handler::open (void *)
92 ACE_TRACE ("ACE_TS_Clerk_Handler::open");
93 ACE_INET_Addr server_addr
;
95 // Set connection state as established
96 this->state (ACE_TS_Clerk_Handler::ESTABLISHED
);
98 // Register ourselves to receive SIGPIPE so we can attempt
100 #if !defined (ACE_WIN32)
101 if (ACE_Reactor::instance ()->register_handler (SIGPIPE
, this) == -1)
102 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%n: %p\n"),
103 ACE_TEXT ("register_handler (SIGPIPE)")), -1);
104 #endif /* ACE_WIN32 */
106 // Register ourselves with the reactor to receive input
107 if (ACE_Reactor::instance ()->register_handler (this->get_handle (),
109 ACE_Event_Handler::READ_MASK
|
110 ACE_Event_Handler::EXCEPT_MASK
) == -1)
111 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%n: %p\n"),
112 ACE_TEXT ("register_handler (this)")));
114 // Figure out what remote port we're really bound to.
115 else if (this->peer ().get_remote_addr (server_addr
) == -1)
116 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
117 ACE_TEXT ("get_remote_addr")),
120 ACE_DEBUG ((LM_DEBUG
,
121 ACE_TEXT ("TS Clerk Daemon connected to port %d on handle %d\n"),
122 server_addr
.get_port_number (),
123 this->peer ().get_handle ()));
129 ACE_TS_Clerk_Handler::get_handle () const
131 ACE_TRACE ("ACE_TS_Clerk_Handler::get_handle");
132 return this->peer().get_handle ();
136 ACE_TS_Clerk_Handler::handle_close (ACE_HANDLE
,
137 ACE_Reactor_Mask mask
)
139 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_close");
140 ACE_UNUSED_ARG (mask
);
142 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) shutting down on handle %d\n"),
143 this->get_handle ()));
145 return this->reinitiate_connection ();
149 ACE_TS_Clerk_Handler::reinitiate_connection ()
151 ACE_TRACE ("ACE_TS_Clerk_Handler::reinitiate_connection");
152 // Skip over deactivated descriptors.
154 // Set state to connecting so that we don't try to send anything
155 // using this handler
156 this->state (ACE_TS_Clerk_Handler::CONNECTING
);
157 if (this->get_handle () != ACE_INVALID_HANDLE
)
159 ACE_DEBUG ((LM_DEBUG
,
160 ACE_TEXT ("(%t) Scheduling reinitiation of connection\n")));
162 // Reschedule ourselves to try and connect again.
163 ACE_Time_Value
const timeout (this->timeout ());
164 if (ACE_Reactor::instance ()->schedule_timer (this, 0,
166 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
167 ACE_TEXT ("schedule_timer")), -1);
172 // Receive a time update from a server
174 ACE_TS_Clerk_Handler::handle_input (ACE_HANDLE
)
176 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_input");
177 // We're getting a time update message from a server
178 ACE_Time_Request reply
;
179 if (this->recv_reply (reply
) != 0)
183 // Get current local time
184 time_t local_time
= ACE_OS::time (0);
186 // Compure delta time (difference between current local time and
187 // system time obtained from the server)
188 time_t t
= reply
.time () - local_time
;
190 // Compute round trip delay and adjust time accordingly
191 time_t one_way_time
= (local_time
- this->start_time_
)/2;
194 // Now update time info (to be retrieved by Clerk_Processor)
195 this->time_info_
.delta_time_
= t
;
196 this->time_info_
.sequence_num_
= this->cur_sequence_num_
;
201 // Restart connection asynchronously when timeout occurs.
203 ACE_TS_Clerk_Handler::handle_timeout (const ACE_Time_Value
&,
206 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_timeout");
207 ACE_DEBUG ((LM_DEBUG
,
208 ACE_TEXT ("(%t) attempting to reconnect to server with timeout = %d\n"),
211 // Close down peer to reclaim descriptor if need be. Note this is
212 // necessary to reconnect.
213 this->peer ().close ();
215 return this->processor_
->initiate_connection (this, ACE_Synch_Options::asynch
);
219 ACE_TS_Clerk_Handler::remote_addr (ACE_INET_Addr
&addr
)
221 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
222 this->remote_addr_
= addr
;
226 ACE_TS_Clerk_Handler::remote_addr ()
228 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
229 return this->remote_addr_
;
233 ACE_TS_Clerk_Handler::recv_reply (ACE_Time_Request
&reply
)
235 ACE_TRACE ("ACE_TS_Clerk_Handler::recv_reply");
236 const int bytes_expected
= reply
.size ();
238 // Since Time_Request messages are fixed size, read the entire
239 // message in one go.
240 ssize_t n
= this->peer ().recv ((void *) &reply
, bytes_expected
);
242 if (n
!= bytes_expected
)
247 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("****************** recv_reply returned -1\n")));
250 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p got %d bytes, expected %d bytes\n"),
251 ACE_TEXT ("recv failed"), n
, bytes_expected
));
254 // We've shutdown unexpectedly
259 else if (reply
.decode () == -1) // Decode the request into host byte order.
260 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
261 ACE_TEXT ("decode failed")), -1);
267 ACE_TS_Clerk_Handler::send_request (ACE_UINT32 sequence_num
, ACE_Time_Info
&time_info
)
269 ACE_TRACE ("ACE_TS_Clerk_Handler::send_request");
273 // Update current sequence number
274 this->cur_sequence_num_
= sequence_num
;
276 // First update the current time info.
277 time_info
.delta_time_
= this->time_info_
.delta_time_
;
278 time_info
.sequence_num_
= this->time_info_
.sequence_num_
;
280 // Now prepare a new time update request
281 ACE_Time_Request
request (ACE_Time_Request::TIME_UPDATE
, 0, 0);
283 if ((length
= request
.encode (buffer
)) == -1)
284 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
285 ACE_TEXT ("encode failed")), -1);
287 // Compute start time of sending request (needed to compute
289 this->start_time_
= ACE_OS::time (0);
292 if (this->peer ().send_n (buffer
, length
) != length
)
293 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
294 ACE_TEXT ("send_n failed")),
300 ACE_TS_Clerk_Processor::ACE_TS_Clerk_Processor ()
301 : timeout_ (ACE_DEFAULT_TIMEOUT
),
302 blocking_semantics_ (0),
303 cur_sequence_num_ (0)
305 #if defined (ACE_DEFAULT_BACKING_STORE)
306 // Create a temporary file.
307 ACE_OS::strcpy (this->poolname_
,
308 ACE_DEFAULT_BACKING_STORE
);
309 #else /* ACE_DEFAULT_BACKING_STORE */
310 if (ACE::get_temp_dir (this->poolname_
,
311 MAXPATHLEN
- 17) == -1) // -17 for ace-malloc-XXXXXX
313 ACE_ERROR ((LM_ERROR
,
314 ACE_TEXT ("Temporary path too long, ")
315 ACE_TEXT ("defaulting to current directory\n")));
316 this->poolname_
[0] = 0;
319 // Add the filename to the end
320 ACE_OS::strcat (this->poolname_
, ACE_TEXT ("ace-malloc-XXXXXX"));
322 #endif /* ACE_DEFAULT_BACKING_STORE */
326 ACE_TS_Clerk_Processor::alloc ()
328 ACE_TRACE ("ACE_TS_Clerk_Processor::alloc");
329 ACE_NEW (this->shmem_
, ALLOCATOR (this->poolname_
));
333 // Only create the state if it doesn't already exist.
334 if (this->shmem_
->find (ACE_DEFAULT_TIME_SERVER_STR
, temp
) == -1)
336 // Allocate the space out of shared memory for the system time entry
337 temp
= (this->shmem_
->malloc (2 * sizeof (time_t)));
339 // Give it a name binding
340 this->shmem_
->bind (ACE_DEFAULT_TIME_SERVER_STR
, temp
);
343 // Set up pointers. Note that we add one to get to the second
344 // field in the structure
345 time_t *time_p
= (time_t *)temp
;
346 this->system_time_
.delta_time_
= time_p
;
347 this->system_time_
.last_local_time_
= time_p
+ 1;
350 *(this->system_time_
.delta_time_
) = 0;
351 *(this->system_time_
.last_local_time_
) = ACE_OS::time (0);
354 // Query the servers for the latest time
356 ACE_TS_Clerk_Processor::handle_timeout (const ACE_Time_Value
&,
359 ACE_TRACE ("ACE_TS_Clerk_Processor::handle_timeout");
360 return this->update_time ();
364 ACE_TS_Clerk_Processor::update_time ()
366 ACE_TRACE ("ACE_TS_Clerk_Processor::update_time");
367 ACE_UINT32 expected_sequence_num
= this->cur_sequence_num_
;
369 // Increment sequence number
370 this->cur_sequence_num_
++;
373 time_t total_delta
= 0;
374 ACE_Time_Info time_info
;
376 // Call send_request() on all handlers
377 ACE_TS_Clerk_Handler
**handler
= 0;
379 for (HANDLER_SET_ITERATOR
set_iterator (this->handler_set_
);
380 set_iterator
.next (handler
) != 0;
381 set_iterator
.advance ())
383 if ((*handler
)->state () == ACE_TS_Clerk_Handler::ESTABLISHED
)
385 if ((*handler
)->send_request (this->cur_sequence_num_
, time_info
) == -1)
387 // Check if sequence numbers match; otherwise discard
388 else if (expected_sequence_num
!= 0 &&
389 time_info
.sequence_num_
== expected_sequence_num
)
392 ACE_DEBUG ((LM_DEBUG
,
393 ACE_TEXT ("[%d] Delta time: %d\n"),
394 count
, time_info
.delta_time_
));
396 // #### Can check here if delta value falls within a threshold ####
397 total_delta
+= time_info
.delta_time_
;
401 // Update system_time_ using average of times obtained from all the servers.
402 // Note that we are keeping two things in shared memory: the delta
403 // time (difference between our system clock and the local clock),
404 // and the last local time
407 // At least one server is out there
408 *(this->system_time_
.delta_time_
) = total_delta
/count
;
412 // No servers are out there (or this is the first time around
413 // computing the time) so set delta time to zero. This
414 // would mean that clients would use the actual local system time.
415 *(this->system_time_
.delta_time_
) = 0;
417 // Update the last local time
418 *(this->system_time_
.last_local_time_
) = ACE_OS::time (0);
420 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Average delta time: %d\n"),
421 (int)(*(this->system_time_
.delta_time_
))));
427 ACE_TS_Clerk_Processor::fini ()
429 ACE_TRACE ("ACE_TS_Clerk_Processor::fini");
432 if (this->timer_id_
!= -1)
433 ACE_Reactor::instance ()->cancel_timer (this->timer_id_
);
435 // Destroy all the handlers
436 ACE_TS_Clerk_Handler
**handler
= 0;
438 for (HANDLER_SET_ITERATOR
set_iterator (this->handler_set_
);
439 set_iterator
.next (handler
) != 0;
440 set_iterator
.advance ())
442 if ((*handler
)->state () != ACE_TS_Clerk_Handler::IDLE
)
443 // Mark state as DISCONNECTING so we don't try to reconnect...
444 (*handler
)->state (ACE_TS_Clerk_Handler::DISCONNECTING
);
446 // Deallocate resources.
447 (*handler
)->destroy (); // Will trigger a delete
450 // Remove the backing store
451 this->shmem_
->remove ();
453 ACE_Connector
<ACE_TS_Clerk_Handler
, ACE_SOCK_CONNECTOR
>::fini ();
459 ACE_TS_Clerk_Processor::info (ACE_TCHAR
**, size_t) const
461 ACE_TRACE ("ACE_TS_Clerk_Processor::info");
466 ACE_TS_Clerk_Processor::init (int argc
, ACE_TCHAR
*argv
[])
468 ACE_TRACE ("ACE_TS_Clerk_Processor::init");
469 // Use the options hook to parse the command line arguments and set
471 this->parse_args (argc
, argv
);
475 #if !defined (ACE_WIN32)
476 // Ignore SIPPIPE so each Output_Channel can handle it.
477 ACE_Sig_Action
sig ((ACE_SignalHandler
) SIG_IGN
, SIGPIPE
);
478 ACE_UNUSED_ARG (sig
);
479 #endif /* ACE_WIN32 */
481 ACE_Synch_Options
&synch_options
= this->blocking_semantics_
== 0
482 ? ACE_Synch_Options::asynch
: ACE_Synch_Options::synch
;
484 // Now set up connections to all servers
485 ACE_TS_Clerk_Handler
**handler
= 0;
487 for (HANDLER_SET_ITERATOR
set_iterator (this->handler_set_
);
488 set_iterator
.next (handler
) != 0;
489 set_iterator
.advance ())
491 this->initiate_connection (*handler
, synch_options
);
493 // Now set up timer to receive updates from server
494 // set the timer to go off after timeout value
495 this->timer_id_
= ACE_Reactor::instance ()->schedule_timer (this,
497 ACE_Time_Value (this->timeout_
),
498 ACE_Time_Value (this->timeout_
));
503 ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler
*handler
,
504 ACE_Synch_Options
&synch_options
)
506 ACE_TRACE ("ACE_TS_Clerk_Processor::initiate_connection");
507 ACE_TCHAR buf
[MAXHOSTNAMELEN
+ 1];
509 // Mark ourselves as idle so that the various iterators will ignore
510 // us until we are connected/reconnected.
511 handler
->state (ACE_TS_Clerk_Handler::IDLE
);
513 if (handler
->remote_addr ().addr_to_string (buf
, MAXHOSTNAMELEN
) == -1)
514 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
515 ACE_TEXT ("can't obtain peer's address")), -1);
517 // Establish connection with the server.
518 if (this->connect (handler
,
519 handler
->remote_addr (),
520 synch_options
) == -1)
522 if (errno
!= EWOULDBLOCK
)
524 handler
->state (ACE_TS_Clerk_Handler::FAILED
);
525 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) %p on address %s\n"),
526 ACE_TEXT ("connect"), buf
));
528 // Reschedule ourselves to try and connect again.
529 if (synch_options
[ACE_Synch_Options::USE_REACTOR
])
531 ACE_Time_Value
const handler_timeout (handler
->timeout ());
532 if (ACE_Reactor::instance ()->schedule_timer (handler
,
534 handler_timeout
) == -1)
535 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
536 ACE_TEXT ("schedule_timer")), -1);
539 // Failures on synchronous connects are reported as errors
540 // so that the caller can decide how to proceed.
545 handler
->state (ACE_TS_Clerk_Handler::CONNECTING
);
546 ACE_DEBUG ((LM_DEBUG
,
547 ACE_TEXT ("(%t) in the process of connecting %s to %s\n"),
548 synch_options
[ACE_Synch_Options::USE_REACTOR
] ?
549 ACE_TEXT ("asynchronously") : ACE_TEXT ("synchronously"),
555 handler
->state (ACE_TS_Clerk_Handler::ESTABLISHED
);
556 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) connected to %s on %d\n"),
557 buf
, handler
->get_handle ()));
563 ACE_TS_Clerk_Processor::parse_args (int argc
, ACE_TCHAR
*argv
[])
565 ACE_TRACE ("ACE_TS_Clerk_Processor::parse_args");
566 ACE_INET_Addr server_addr
;
567 ACE_TS_Clerk_Handler
*handler
;
569 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("h:t:p:b"), 0);
571 for (int c
; (c
= get_opt ()) != -1; )
576 // Get the hostname:port and create an ADDR
577 server_addr
.set (get_opt
.opt_arg ());
579 // Create a new handler
580 ACE_NEW_RETURN (handler
,
581 ACE_TS_Clerk_Handler (this, server_addr
),
585 this->handler_set_
.insert (handler
);
588 // Get the timeout value
589 this->timeout_
= ACE_OS::atoi (get_opt
.opt_arg ());
593 ACE_OS::strncpy (this->poolname_
,
595 sizeof this->poolname_
/ sizeof (ACE_TCHAR
));
598 // Blocking semantics
599 this->blocking_semantics_
= 1;
602 ACE_ERROR_RETURN ((LM_ERROR
,
603 ACE_TEXT ("%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n")),
611 ACE_TS_Clerk_Processor::suspend ()
613 ACE_TRACE ("ACE_TS_Clerk_Processor::suspend");
618 ACE_TS_Clerk_Processor::resume ()
620 ACE_TRACE ("ACE_TS_Clerk_Processor::resume");
624 // The following is a "Factory" used by the ACE_Service_Config and
625 // svc.conf file to dynamically initialize the state of the TS_Clerk.
627 ACE_SVC_FACTORY_DEFINE (ACE_TS_Clerk_Processor
)