Update NEWS
[ACE_TAO.git] / ACE / netsvcs / lib / TS_Clerk_Handler.cpp
blob8b56b5f179035349f79fb4b5eec41971bc5caa27
1 #include "ace/Get_Opt.h"
2 #include "TS_Clerk_Handler.h"
3 #include "ace/Lib_Find.h"
4 #include "ace/Signal.h"
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/OS_NS_string.h"
7 #include "ace/OS_NS_time.h"
8 #include "ace/os_include/os_netdb.h"
10 ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor *processor,
11 ACE_INET_Addr &addr)
12 : state_ (ACE_TS_Clerk_Handler::IDLE),
13 timeout_ (ACE_DEFAULT_TIMEOUT),
14 max_timeout_ (ACE_TS_Clerk_Handler::MAX_RETRY_TIMEOUT),
15 remote_addr_ (addr),
16 processor_ (processor)
18 ACE_TRACE ("ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler");
19 this->time_info_.delta_time_ = 0;
20 this->time_info_.sequence_num_ = 0;
23 // Set the connection state
24 void
25 ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state)
27 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
28 this->state_ = state;
31 // Get the connection state
32 ACE_TS_Clerk_Handler::State
33 ACE_TS_Clerk_Handler::state (void)
35 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
36 return this->state_;
39 // Sets the timeout delay.
40 void
41 ACE_TS_Clerk_Handler::timeout (long to)
43 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
44 if (to > this->max_timeout_)
45 to = this->max_timeout_;
47 this->timeout_ = to;
50 // Recalculate the current retry timeout delay using exponential
51 // backoff. Returns the original timeout (i.e., before the
52 // recalculation).
53 long
54 ACE_TS_Clerk_Handler::timeout (void)
56 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
57 long old_timeout = this->timeout_;
58 this->timeout_ *= 2;
60 if (this->timeout_ > this->max_timeout_)
61 this->timeout_ = this->max_timeout_;
63 return old_timeout;
66 // This is called when a <send> to the logging server fails...
68 int
69 ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
71 return -1;
74 // Set the max timeout delay.
75 void
76 ACE_TS_Clerk_Handler::max_timeout (long mto)
78 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
79 this->max_timeout_ = mto;
82 // Gets the max timeout delay.
83 long
84 ACE_TS_Clerk_Handler::max_timeout (void)
86 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
87 return this->max_timeout_;
90 int
91 ACE_TS_Clerk_Handler::open (void *)
93 ACE_TRACE ("ACE_TS_Clerk_Handler::open");
94 ACE_INET_Addr server_addr;
96 // Set connection state as established
97 this->state (ACE_TS_Clerk_Handler::ESTABLISHED);
99 // Register ourselves to receive SIGPIPE so we can attempt
100 // reconnections.
101 #if !defined (ACE_WIN32)
102 if (ACE_Reactor::instance ()->register_handler (SIGPIPE, this) == -1)
103 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%n: %p\n"),
104 ACE_TEXT ("register_handler (SIGPIPE)")), -1);
105 #endif /* ACE_WIN32 */
107 // Register ourselves with the reactor to receive input
108 if (ACE_Reactor::instance ()->register_handler (this->get_handle (),
109 this,
110 ACE_Event_Handler::READ_MASK |
111 ACE_Event_Handler::EXCEPT_MASK) == -1)
112 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%n: %p\n"),
113 ACE_TEXT ("register_handler (this)")));
115 // Figure out what remote port we're really bound to.
116 else if (this->peer ().get_remote_addr (server_addr) == -1)
117 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
118 ACE_TEXT ("get_remote_addr")),
119 -1);
121 ACE_DEBUG ((LM_DEBUG,
122 ACE_TEXT ("TS Clerk Daemon connected to port %d on handle %d\n"),
123 server_addr.get_port_number (),
124 this->peer ().get_handle ()));
126 return 0;
129 ACE_HANDLE
130 ACE_TS_Clerk_Handler::get_handle (void) const
132 ACE_TRACE ("ACE_TS_Clerk_Handler::get_handle");
133 return this->peer().get_handle ();
137 ACE_TS_Clerk_Handler::handle_close (ACE_HANDLE,
138 ACE_Reactor_Mask mask)
140 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_close");
141 ACE_UNUSED_ARG (mask);
143 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) shutting down on handle %d\n"),
144 this->get_handle ()));
146 return this->reinitiate_connection ();
150 ACE_TS_Clerk_Handler::reinitiate_connection (void)
152 ACE_TRACE ("ACE_TS_Clerk_Handler::reinitiate_connection");
153 // Skip over deactivated descriptors.
155 // Set state to connecting so that we don't try to send anything
156 // using this handler
157 this->state (ACE_TS_Clerk_Handler::CONNECTING);
158 if (this->get_handle () != ACE_INVALID_HANDLE)
160 ACE_DEBUG ((LM_DEBUG,
161 ACE_TEXT ("(%t) Scheduling reinitiation of connection\n")));
163 // Reschedule ourselves to try and connect again.
164 ACE_Time_Value const timeout (this->timeout ());
165 if (ACE_Reactor::instance ()->schedule_timer (this, 0,
166 timeout) == -1)
167 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
168 ACE_TEXT ("schedule_timer")), -1);
170 return 0;
173 // Receive a time update from a server
175 ACE_TS_Clerk_Handler::handle_input (ACE_HANDLE)
177 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_input");
178 // We're getting a time update message from a server
179 ACE_Time_Request reply;
180 if (this->recv_reply (reply) != 0)
181 return -1;
182 else
184 // Get current local time
185 time_t local_time = ACE_OS::time (0);
187 // Compure delta time (difference between current local time and
188 // system time obtained from the server)
189 time_t t = reply.time () - local_time;
191 // Compute round trip delay and adjust time accordingly
192 time_t one_way_time = (local_time - this->start_time_)/2;
193 t += one_way_time;
195 // Now update time info (to be retrieved by Clerk_Processor)
196 this->time_info_.delta_time_ = t;
197 this->time_info_.sequence_num_ = this->cur_sequence_num_;
199 return 0;
202 // Restart connection asynchronously when timeout occurs.
204 ACE_TS_Clerk_Handler::handle_timeout (const ACE_Time_Value &,
205 const void *)
207 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_timeout");
208 ACE_DEBUG ((LM_DEBUG,
209 ACE_TEXT ("(%t) attempting to reconnect to server with timeout = %d\n"),
210 this->timeout_));
212 // Close down peer to reclaim descriptor if need be. Note this is
213 // necessary to reconnect.
214 this->peer ().close ();
216 return this->processor_->initiate_connection (this, ACE_Synch_Options::asynch);
219 void
220 ACE_TS_Clerk_Handler::remote_addr (ACE_INET_Addr &addr)
222 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
223 this->remote_addr_ = addr;
226 ACE_INET_Addr &
227 ACE_TS_Clerk_Handler::remote_addr (void)
229 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
230 return this->remote_addr_;
234 ACE_TS_Clerk_Handler::recv_reply (ACE_Time_Request &reply)
236 ACE_TRACE ("ACE_TS_Clerk_Handler::recv_reply");
237 const int bytes_expected = reply.size ();
239 // Since Time_Request messages are fixed size, read the entire
240 // message in one go.
241 ssize_t n = this->peer ().recv ((void *) &reply, bytes_expected);
243 if (n != bytes_expected)
245 switch (n)
247 case -1:
248 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("****************** recv_reply returned -1\n")));
249 // FALLTHROUGH
250 default:
251 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p got %d bytes, expected %d bytes\n"),
252 ACE_TEXT ("recv failed"), n, bytes_expected));
253 // FALLTHROUGH
254 case 0:
255 // We've shutdown unexpectedly
256 return -1;
257 // NOTREACHED
260 else if (reply.decode () == -1) // Decode the request into host byte order.
261 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
262 ACE_TEXT ("decode failed")), -1);
263 return 0;
268 ACE_TS_Clerk_Handler::send_request (ACE_UINT32 sequence_num, ACE_Time_Info &time_info)
270 ACE_TRACE ("ACE_TS_Clerk_Handler::send_request");
271 void *buffer;
272 ssize_t length;
274 // Update current sequence number
275 this->cur_sequence_num_ = sequence_num;
277 // First update the current time info.
278 time_info.delta_time_ = this->time_info_.delta_time_;
279 time_info.sequence_num_ = this->time_info_.sequence_num_;
281 // Now prepare a new time update request
282 ACE_Time_Request request (ACE_Time_Request::TIME_UPDATE, 0, 0);
284 if ((length = request.encode (buffer)) == -1)
285 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
286 ACE_TEXT ("encode failed")), -1);
288 // Compute start time of sending request (needed to compute
289 // roundtrip delay)
290 this->start_time_ = ACE_OS::time (0);
292 // Send the request
293 if (this->peer ().send_n (buffer, length) != length)
294 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
295 ACE_TEXT ("send_n failed")),
296 -1);
298 return 0;
301 ACE_TS_Clerk_Processor::ACE_TS_Clerk_Processor ()
302 : timeout_ (ACE_DEFAULT_TIMEOUT),
303 blocking_semantics_ (0),
304 cur_sequence_num_ (0)
306 #if defined (ACE_DEFAULT_BACKING_STORE)
307 // Create a temporary file.
308 ACE_OS::strcpy (this->poolname_,
309 ACE_DEFAULT_BACKING_STORE);
310 #else /* ACE_DEFAULT_BACKING_STORE */
311 if (ACE::get_temp_dir (this->poolname_,
312 MAXPATHLEN - 17) == -1) // -17 for ace-malloc-XXXXXX
314 ACE_ERROR ((LM_ERROR,
315 ACE_TEXT ("Temporary path too long, ")
316 ACE_TEXT ("defaulting to current directory\n")));
317 this->poolname_[0] = 0;
320 // Add the filename to the end
321 ACE_OS::strcat (this->poolname_, ACE_TEXT ("ace-malloc-XXXXXX"));
323 #endif /* ACE_DEFAULT_BACKING_STORE */
326 void
327 ACE_TS_Clerk_Processor::alloc (void)
329 ACE_TRACE ("ACE_TS_Clerk_Processor::alloc");
330 ACE_NEW (this->shmem_, ALLOCATOR (this->poolname_));
332 void *temp = 0;
334 // Only create the state if it doesn't already exist.
335 if (this->shmem_->find (ACE_DEFAULT_TIME_SERVER_STR, temp) == -1)
337 // Allocate the space out of shared memory for the system time entry
338 temp = (this->shmem_->malloc (2 * sizeof (time_t)));
340 // Give it a name binding
341 this->shmem_->bind (ACE_DEFAULT_TIME_SERVER_STR, temp);
344 // Set up pointers. Note that we add one to get to the second
345 // field in the structure
346 time_t *time_p = (time_t *)temp;
347 this->system_time_.delta_time_ = time_p;
348 this->system_time_.last_local_time_ = time_p + 1;
350 // Initialize
351 *(this->system_time_.delta_time_) = 0;
352 *(this->system_time_.last_local_time_) = ACE_OS::time (0);
355 // Query the servers for the latest time
357 ACE_TS_Clerk_Processor::handle_timeout (const ACE_Time_Value &,
358 const void *)
360 ACE_TRACE ("ACE_TS_Clerk_Processor::handle_timeout");
361 return this->update_time ();
365 ACE_TS_Clerk_Processor::update_time ()
367 ACE_TRACE ("ACE_TS_Clerk_Processor::update_time");
368 ACE_UINT32 expected_sequence_num = this->cur_sequence_num_;
370 // Increment sequence number
371 this->cur_sequence_num_++;
373 int count = 0;
374 time_t total_delta = 0;
375 ACE_Time_Info time_info;
377 // Call send_request() on all handlers
378 ACE_TS_Clerk_Handler **handler = 0;
380 for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
381 set_iterator.next (handler) != 0;
382 set_iterator.advance ())
384 if ((*handler)->state () == ACE_TS_Clerk_Handler::ESTABLISHED)
386 if ((*handler)->send_request (this->cur_sequence_num_, time_info) == -1)
387 return -1;
388 // Check if sequence numbers match; otherwise discard
389 else if (expected_sequence_num != 0 &&
390 time_info.sequence_num_ == expected_sequence_num)
392 count++;
393 ACE_DEBUG ((LM_DEBUG,
394 ACE_TEXT ("[%d] Delta time: %d\n"),
395 count, time_info.delta_time_));
397 // #### Can check here if delta value falls within a threshold ####
398 total_delta += time_info.delta_time_;
402 // Update system_time_ using average of times obtained from all the servers.
403 // Note that we are keeping two things in shared memory: the delta
404 // time (difference between our system clock and the local clock),
405 // and the last local time
406 if (count > 0)
408 // At least one server is out there
409 *(this->system_time_.delta_time_) = total_delta/count;
411 else
413 // No servers are out there (or this is the first time around
414 // computing the time) so set delta time to zero. This
415 // would mean that clients would use the actual local system time.
416 *(this->system_time_.delta_time_) = 0;
418 // Update the last local time
419 *(this->system_time_.last_local_time_) = ACE_OS::time (0);
421 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Average delta time: %d\n"),
422 (int)(*(this->system_time_.delta_time_))));
423 return 0;
428 ACE_TS_Clerk_Processor::fini (void)
430 ACE_TRACE ("ACE_TS_Clerk_Processor::fini");
432 // Cancel the timer
433 if (this->timer_id_ != -1)
434 ACE_Reactor::instance ()->cancel_timer (this->timer_id_);
436 // Destroy all the handlers
437 ACE_TS_Clerk_Handler **handler = 0;
439 for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
440 set_iterator.next (handler) != 0;
441 set_iterator.advance ())
443 if ((*handler)->state () != ACE_TS_Clerk_Handler::IDLE)
444 // Mark state as DISCONNECTING so we don't try to reconnect...
445 (*handler)->state (ACE_TS_Clerk_Handler::DISCONNECTING);
447 // Deallocate resources.
448 (*handler)->destroy (); // Will trigger a delete
451 // Remove the backing store
452 this->shmem_->remove ();
454 ACE_Connector <ACE_TS_Clerk_Handler, ACE_SOCK_CONNECTOR>::fini ();
456 return 0;
460 ACE_TS_Clerk_Processor::info (ACE_TCHAR **, size_t) const
462 ACE_TRACE ("ACE_TS_Clerk_Processor::info");
463 return 0;
467 ACE_TS_Clerk_Processor::init (int argc, ACE_TCHAR *argv[])
469 ACE_TRACE ("ACE_TS_Clerk_Processor::init");
470 // Use the options hook to parse the command line arguments and set
471 // options.
472 this->parse_args (argc, argv);
474 this->alloc ();
476 #if !defined (ACE_WIN32)
477 // Ignore SIPPIPE so each Output_Channel can handle it.
478 ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
479 ACE_UNUSED_ARG (sig);
480 #endif /* ACE_WIN32 */
482 ACE_Synch_Options &synch_options = this->blocking_semantics_ == 0
483 ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch;
485 // Now set up connections to all servers
486 ACE_TS_Clerk_Handler **handler = 0;
488 for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
489 set_iterator.next (handler) != 0;
490 set_iterator.advance ())
492 this->initiate_connection (*handler, synch_options);
494 // Now set up timer to receive updates from server
495 // set the timer to go off after timeout value
496 this->timer_id_ = ACE_Reactor::instance ()->schedule_timer (this,
498 ACE_Time_Value (this->timeout_),
499 ACE_Time_Value (this->timeout_));
500 return 0;
504 ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler *handler,
505 ACE_Synch_Options &synch_options)
507 ACE_TRACE ("ACE_TS_Clerk_Processor::initiate_connection");
508 ACE_TCHAR buf[MAXHOSTNAMELEN + 1];
510 // Mark ourselves as idle so that the various iterators will ignore
511 // us until we are connected/reconnected.
512 handler->state (ACE_TS_Clerk_Handler::IDLE);
514 if (handler->remote_addr ().addr_to_string (buf, MAXHOSTNAMELEN) == -1)
515 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
516 ACE_TEXT ("can't obtain peer's address")), -1);
518 // Establish connection with the server.
519 if (this->connect (handler,
520 handler->remote_addr (),
521 synch_options) == -1)
523 if (errno != EWOULDBLOCK)
525 handler->state (ACE_TS_Clerk_Handler::FAILED);
526 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) %p on address %s\n"),
527 ACE_TEXT ("connect"), buf));
529 // Reschedule ourselves to try and connect again.
530 if (synch_options[ACE_Synch_Options::USE_REACTOR])
532 ACE_Time_Value const handler_timeout (handler->timeout ());
533 if (ACE_Reactor::instance ()->schedule_timer (handler,
535 handler_timeout) == -1)
536 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
537 ACE_TEXT ("schedule_timer")), -1);
539 else
540 // Failures on synchronous connects are reported as errors
541 // so that the caller can decide how to proceed.
542 return -1;
544 else
546 handler->state (ACE_TS_Clerk_Handler::CONNECTING);
547 ACE_DEBUG ((LM_DEBUG,
548 ACE_TEXT ("(%t) in the process of connecting %s to %s\n"),
549 synch_options[ACE_Synch_Options::USE_REACTOR] ?
550 ACE_TEXT ("asynchronously") : ACE_TEXT ("synchronously"),
551 buf));
554 else
556 handler->state (ACE_TS_Clerk_Handler::ESTABLISHED);
557 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) connected to %s on %d\n"),
558 buf, handler->get_handle ()));
560 return 0;
564 ACE_TS_Clerk_Processor::parse_args (int argc, ACE_TCHAR *argv[])
566 ACE_TRACE ("ACE_TS_Clerk_Processor::parse_args");
567 ACE_INET_Addr server_addr;
568 ACE_TS_Clerk_Handler *handler;
570 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:t:p:b"), 0);
572 for (int c; (c = get_opt ()) != -1; )
574 switch (c)
576 case 'h':
577 // Get the hostname:port and create an ADDR
578 server_addr.set (get_opt.opt_arg ());
580 // Create a new handler
581 ACE_NEW_RETURN (handler,
582 ACE_TS_Clerk_Handler (this, server_addr),
583 -1);
585 // Cache the handler
586 this->handler_set_.insert (handler);
587 break;
588 case 't':
589 // Get the timeout value
590 this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ());
591 break;
592 case 'p':
593 // Get the poolname
594 ACE_OS::strncpy (this->poolname_,
595 get_opt.opt_arg (),
596 sizeof this->poolname_ / sizeof (ACE_TCHAR));
597 break;
598 case 'b':
599 // Blocking semantics
600 this->blocking_semantics_ = 1;
601 break;
602 default:
603 ACE_ERROR_RETURN ((LM_ERROR,
604 ACE_TEXT ("%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n")),
605 -1);
608 return 0;
612 ACE_TS_Clerk_Processor::suspend (void)
614 ACE_TRACE ("ACE_TS_Clerk_Processor::suspend");
615 return 0;
619 ACE_TS_Clerk_Processor::resume (void)
621 ACE_TRACE ("ACE_TS_Clerk_Processor::resume");
622 return 0;
625 // The following is a "Factory" used by the ACE_Service_Config and
626 // svc.conf file to dynamically initialize the state of the TS_Clerk.
628 ACE_SVC_FACTORY_DEFINE (ACE_TS_Clerk_Processor)