Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / netsvcs / lib / TS_Clerk_Handler.cpp
blob0eb0a41db390d6ed0d934c5a6d411532cb993b76
1 #include "ace/Get_Opt.h"
2 #include "TS_Clerk_Handler.h"
3 #include "ace/Lib_Find.h"
4 #include "ace/Signal.h"
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/OS_NS_string.h"
7 #include "ace/OS_NS_time.h"
8 #include "ace/os_include/os_netdb.h"
10 ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor *processor,
11 ACE_INET_Addr &addr)
12 : state_ (ACE_TS_Clerk_Handler::IDLE),
13 timeout_ (ACE_DEFAULT_TIMEOUT),
14 max_timeout_ (ACE_TS_Clerk_Handler::MAX_RETRY_TIMEOUT),
15 remote_addr_ (addr),
16 processor_ (processor)
18 ACE_TRACE ("ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler");
19 this->time_info_.delta_time_ = 0;
20 this->time_info_.sequence_num_ = 0;
23 // Set the connection state
24 void
25 ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state)
27 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
28 this->state_ = state;
31 // Get the connection state
32 ACE_TS_Clerk_Handler::State
33 ACE_TS_Clerk_Handler::state ()
35 ACE_TRACE ("ACE_TS_Clerk_Handler::state");
36 return this->state_;
39 // Sets the timeout delay.
40 void
41 ACE_TS_Clerk_Handler::timeout (long to)
43 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
44 if (to > this->max_timeout_)
45 to = this->max_timeout_;
47 this->timeout_ = to;
50 // Recalculate the current retry timeout delay using exponential
51 // backoff. Returns the original timeout (i.e., before the
52 // recalculation).
53 long
54 ACE_TS_Clerk_Handler::timeout ()
56 ACE_TRACE ("ACE_TS_Clerk_Handler::timeout");
57 long old_timeout = this->timeout_;
58 this->timeout_ *= 2;
60 if (this->timeout_ > this->max_timeout_)
61 this->timeout_ = this->max_timeout_;
63 return old_timeout;
66 // This is called when a <send> to the logging server fails...
67 int
68 ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
70 return -1;
73 // Set the max timeout delay.
74 void
75 ACE_TS_Clerk_Handler::max_timeout (long mto)
77 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
78 this->max_timeout_ = mto;
81 // Gets the max timeout delay.
82 long
83 ACE_TS_Clerk_Handler::max_timeout ()
85 ACE_TRACE ("ACE_TS_Clerk_Handler::max_timeout");
86 return this->max_timeout_;
89 int
90 ACE_TS_Clerk_Handler::open (void *)
92 ACE_TRACE ("ACE_TS_Clerk_Handler::open");
93 ACE_INET_Addr server_addr;
95 // Set connection state as established
96 this->state (ACE_TS_Clerk_Handler::ESTABLISHED);
98 // Register ourselves to receive SIGPIPE so we can attempt
99 // reconnections.
100 #if !defined (ACE_WIN32)
101 if (ACE_Reactor::instance ()->register_handler (SIGPIPE, this) == -1)
102 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%n: %p\n"),
103 ACE_TEXT ("register_handler (SIGPIPE)")), -1);
104 #endif /* ACE_WIN32 */
106 // Register ourselves with the reactor to receive input
107 if (ACE_Reactor::instance ()->register_handler (this->get_handle (),
108 this,
109 ACE_Event_Handler::READ_MASK |
110 ACE_Event_Handler::EXCEPT_MASK) == -1)
111 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%n: %p\n"),
112 ACE_TEXT ("register_handler (this)")));
114 // Figure out what remote port we're really bound to.
115 else if (this->peer ().get_remote_addr (server_addr) == -1)
116 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
117 ACE_TEXT ("get_remote_addr")),
118 -1);
120 ACE_DEBUG ((LM_DEBUG,
121 ACE_TEXT ("TS Clerk Daemon connected to port %d on handle %d\n"),
122 server_addr.get_port_number (),
123 this->peer ().get_handle ()));
125 return 0;
128 ACE_HANDLE
129 ACE_TS_Clerk_Handler::get_handle () const
131 ACE_TRACE ("ACE_TS_Clerk_Handler::get_handle");
132 return this->peer().get_handle ();
136 ACE_TS_Clerk_Handler::handle_close (ACE_HANDLE,
137 ACE_Reactor_Mask mask)
139 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_close");
140 ACE_UNUSED_ARG (mask);
142 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) shutting down on handle %d\n"),
143 this->get_handle ()));
145 return this->reinitiate_connection ();
149 ACE_TS_Clerk_Handler::reinitiate_connection ()
151 ACE_TRACE ("ACE_TS_Clerk_Handler::reinitiate_connection");
152 // Skip over deactivated descriptors.
154 // Set state to connecting so that we don't try to send anything
155 // using this handler
156 this->state (ACE_TS_Clerk_Handler::CONNECTING);
157 if (this->get_handle () != ACE_INVALID_HANDLE)
159 ACE_DEBUG ((LM_DEBUG,
160 ACE_TEXT ("(%t) Scheduling reinitiation of connection\n")));
162 // Reschedule ourselves to try and connect again.
163 ACE_Time_Value const timeout (this->timeout ());
164 if (ACE_Reactor::instance ()->schedule_timer (this, 0,
165 timeout) == -1)
166 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
167 ACE_TEXT ("schedule_timer")), -1);
169 return 0;
172 // Receive a time update from a server
174 ACE_TS_Clerk_Handler::handle_input (ACE_HANDLE)
176 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_input");
177 // We're getting a time update message from a server
178 ACE_Time_Request reply;
179 if (this->recv_reply (reply) != 0)
180 return -1;
181 else
183 // Get current local time
184 time_t local_time = ACE_OS::time (0);
186 // Compure delta time (difference between current local time and
187 // system time obtained from the server)
188 time_t t = reply.time () - local_time;
190 // Compute round trip delay and adjust time accordingly
191 time_t one_way_time = (local_time - this->start_time_)/2;
192 t += one_way_time;
194 // Now update time info (to be retrieved by Clerk_Processor)
195 this->time_info_.delta_time_ = t;
196 this->time_info_.sequence_num_ = this->cur_sequence_num_;
198 return 0;
201 // Restart connection asynchronously when timeout occurs.
203 ACE_TS_Clerk_Handler::handle_timeout (const ACE_Time_Value &,
204 const void *)
206 ACE_TRACE ("ACE_TS_Clerk_Handler::handle_timeout");
207 ACE_DEBUG ((LM_DEBUG,
208 ACE_TEXT ("(%t) attempting to reconnect to server with timeout = %d\n"),
209 this->timeout_));
211 // Close down peer to reclaim descriptor if need be. Note this is
212 // necessary to reconnect.
213 this->peer ().close ();
215 return this->processor_->initiate_connection (this, ACE_Synch_Options::asynch);
218 void
219 ACE_TS_Clerk_Handler::remote_addr (ACE_INET_Addr &addr)
221 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
222 this->remote_addr_ = addr;
225 ACE_INET_Addr &
226 ACE_TS_Clerk_Handler::remote_addr ()
228 ACE_TRACE ("ACE_TS_Clerk_Handler::remote_addr");
229 return this->remote_addr_;
233 ACE_TS_Clerk_Handler::recv_reply (ACE_Time_Request &reply)
235 ACE_TRACE ("ACE_TS_Clerk_Handler::recv_reply");
236 const int bytes_expected = reply.size ();
238 // Since Time_Request messages are fixed size, read the entire
239 // message in one go.
240 ssize_t n = this->peer ().recv ((void *) &reply, bytes_expected);
242 if (n != bytes_expected)
244 switch (n)
246 case -1:
247 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("****************** recv_reply returned -1\n")));
248 ACE_FALLTHROUGH;
249 default:
250 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p got %d bytes, expected %d bytes\n"),
251 ACE_TEXT ("recv failed"), n, bytes_expected));
252 ACE_FALLTHROUGH;
253 case 0:
254 // We've shutdown unexpectedly
255 return -1;
256 // NOTREACHED
259 else if (reply.decode () == -1) // Decode the request into host byte order.
260 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
261 ACE_TEXT ("decode failed")), -1);
262 return 0;
267 ACE_TS_Clerk_Handler::send_request (ACE_UINT32 sequence_num, ACE_Time_Info &time_info)
269 ACE_TRACE ("ACE_TS_Clerk_Handler::send_request");
270 void *buffer;
271 ssize_t length;
273 // Update current sequence number
274 this->cur_sequence_num_ = sequence_num;
276 // First update the current time info.
277 time_info.delta_time_ = this->time_info_.delta_time_;
278 time_info.sequence_num_ = this->time_info_.sequence_num_;
280 // Now prepare a new time update request
281 ACE_Time_Request request (ACE_Time_Request::TIME_UPDATE, 0, 0);
283 if ((length = request.encode (buffer)) == -1)
284 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
285 ACE_TEXT ("encode failed")), -1);
287 // Compute start time of sending request (needed to compute
288 // roundtrip delay)
289 this->start_time_ = ACE_OS::time (0);
291 // Send the request
292 if (this->peer ().send_n (buffer, length) != length)
293 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
294 ACE_TEXT ("send_n failed")),
295 -1);
297 return 0;
300 ACE_TS_Clerk_Processor::ACE_TS_Clerk_Processor ()
301 : timeout_ (ACE_DEFAULT_TIMEOUT),
302 blocking_semantics_ (0),
303 cur_sequence_num_ (0)
305 #if defined (ACE_DEFAULT_BACKING_STORE)
306 // Create a temporary file.
307 ACE_OS::strcpy (this->poolname_,
308 ACE_DEFAULT_BACKING_STORE);
309 #else /* ACE_DEFAULT_BACKING_STORE */
310 if (ACE::get_temp_dir (this->poolname_,
311 MAXPATHLEN - 17) == -1) // -17 for ace-malloc-XXXXXX
313 ACE_ERROR ((LM_ERROR,
314 ACE_TEXT ("Temporary path too long, ")
315 ACE_TEXT ("defaulting to current directory\n")));
316 this->poolname_[0] = 0;
319 // Add the filename to the end
320 ACE_OS::strcat (this->poolname_, ACE_TEXT ("ace-malloc-XXXXXX"));
322 #endif /* ACE_DEFAULT_BACKING_STORE */
325 void
326 ACE_TS_Clerk_Processor::alloc ()
328 ACE_TRACE ("ACE_TS_Clerk_Processor::alloc");
329 ACE_NEW (this->shmem_, ALLOCATOR (this->poolname_));
331 void *temp = 0;
333 // Only create the state if it doesn't already exist.
334 if (this->shmem_->find (ACE_DEFAULT_TIME_SERVER_STR, temp) == -1)
336 // Allocate the space out of shared memory for the system time entry
337 temp = (this->shmem_->malloc (2 * sizeof (time_t)));
339 // Give it a name binding
340 this->shmem_->bind (ACE_DEFAULT_TIME_SERVER_STR, temp);
343 // Set up pointers. Note that we add one to get to the second
344 // field in the structure
345 time_t *time_p = (time_t *)temp;
346 this->system_time_.delta_time_ = time_p;
347 this->system_time_.last_local_time_ = time_p + 1;
349 // Initialize
350 *(this->system_time_.delta_time_) = 0;
351 *(this->system_time_.last_local_time_) = ACE_OS::time (0);
354 // Query the servers for the latest time
356 ACE_TS_Clerk_Processor::handle_timeout (const ACE_Time_Value &,
357 const void *)
359 ACE_TRACE ("ACE_TS_Clerk_Processor::handle_timeout");
360 return this->update_time ();
364 ACE_TS_Clerk_Processor::update_time ()
366 ACE_TRACE ("ACE_TS_Clerk_Processor::update_time");
367 ACE_UINT32 expected_sequence_num = this->cur_sequence_num_;
369 // Increment sequence number
370 this->cur_sequence_num_++;
372 int count = 0;
373 time_t total_delta = 0;
374 ACE_Time_Info time_info;
376 // Call send_request() on all handlers
377 ACE_TS_Clerk_Handler **handler = 0;
379 for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
380 set_iterator.next (handler) != 0;
381 set_iterator.advance ())
383 if ((*handler)->state () == ACE_TS_Clerk_Handler::ESTABLISHED)
385 if ((*handler)->send_request (this->cur_sequence_num_, time_info) == -1)
386 return -1;
387 // Check if sequence numbers match; otherwise discard
388 else if (expected_sequence_num != 0 &&
389 time_info.sequence_num_ == expected_sequence_num)
391 count++;
392 ACE_DEBUG ((LM_DEBUG,
393 ACE_TEXT ("[%d] Delta time: %d\n"),
394 count, time_info.delta_time_));
396 // #### Can check here if delta value falls within a threshold ####
397 total_delta += time_info.delta_time_;
401 // Update system_time_ using average of times obtained from all the servers.
402 // Note that we are keeping two things in shared memory: the delta
403 // time (difference between our system clock and the local clock),
404 // and the last local time
405 if (count > 0)
407 // At least one server is out there
408 *(this->system_time_.delta_time_) = total_delta/count;
410 else
412 // No servers are out there (or this is the first time around
413 // computing the time) so set delta time to zero. This
414 // would mean that clients would use the actual local system time.
415 *(this->system_time_.delta_time_) = 0;
417 // Update the last local time
418 *(this->system_time_.last_local_time_) = ACE_OS::time (0);
420 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Average delta time: %d\n"),
421 (int)(*(this->system_time_.delta_time_))));
422 return 0;
427 ACE_TS_Clerk_Processor::fini ()
429 ACE_TRACE ("ACE_TS_Clerk_Processor::fini");
431 // Cancel the timer
432 if (this->timer_id_ != -1)
433 ACE_Reactor::instance ()->cancel_timer (this->timer_id_);
435 // Destroy all the handlers
436 ACE_TS_Clerk_Handler **handler = 0;
438 for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
439 set_iterator.next (handler) != 0;
440 set_iterator.advance ())
442 if ((*handler)->state () != ACE_TS_Clerk_Handler::IDLE)
443 // Mark state as DISCONNECTING so we don't try to reconnect...
444 (*handler)->state (ACE_TS_Clerk_Handler::DISCONNECTING);
446 // Deallocate resources.
447 (*handler)->destroy (); // Will trigger a delete
450 // Remove the backing store
451 this->shmem_->remove ();
453 ACE_Connector <ACE_TS_Clerk_Handler, ACE_SOCK_CONNECTOR>::fini ();
455 return 0;
459 ACE_TS_Clerk_Processor::info (ACE_TCHAR **, size_t) const
461 ACE_TRACE ("ACE_TS_Clerk_Processor::info");
462 return 0;
466 ACE_TS_Clerk_Processor::init (int argc, ACE_TCHAR *argv[])
468 ACE_TRACE ("ACE_TS_Clerk_Processor::init");
469 // Use the options hook to parse the command line arguments and set
470 // options.
471 this->parse_args (argc, argv);
473 this->alloc ();
475 #if !defined (ACE_WIN32)
476 // Ignore SIPPIPE so each Output_Channel can handle it.
477 ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
478 ACE_UNUSED_ARG (sig);
479 #endif /* ACE_WIN32 */
481 ACE_Synch_Options &synch_options = this->blocking_semantics_ == 0
482 ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch;
484 // Now set up connections to all servers
485 ACE_TS_Clerk_Handler **handler = 0;
487 for (HANDLER_SET_ITERATOR set_iterator (this->handler_set_);
488 set_iterator.next (handler) != 0;
489 set_iterator.advance ())
491 this->initiate_connection (*handler, synch_options);
493 // Now set up timer to receive updates from server
494 // set the timer to go off after timeout value
495 this->timer_id_ = ACE_Reactor::instance ()->schedule_timer (this,
497 ACE_Time_Value (this->timeout_),
498 ACE_Time_Value (this->timeout_));
499 return 0;
503 ACE_TS_Clerk_Processor::initiate_connection (ACE_TS_Clerk_Handler *handler,
504 ACE_Synch_Options &synch_options)
506 ACE_TRACE ("ACE_TS_Clerk_Processor::initiate_connection");
507 ACE_TCHAR buf[MAXHOSTNAMELEN + 1];
509 // Mark ourselves as idle so that the various iterators will ignore
510 // us until we are connected/reconnected.
511 handler->state (ACE_TS_Clerk_Handler::IDLE);
513 if (handler->remote_addr ().addr_to_string (buf, MAXHOSTNAMELEN) == -1)
514 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
515 ACE_TEXT ("can't obtain peer's address")), -1);
517 // Establish connection with the server.
518 if (this->connect (handler,
519 handler->remote_addr (),
520 synch_options) == -1)
522 if (errno != EWOULDBLOCK)
524 handler->state (ACE_TS_Clerk_Handler::FAILED);
525 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) %p on address %s\n"),
526 ACE_TEXT ("connect"), buf));
528 // Reschedule ourselves to try and connect again.
529 if (synch_options[ACE_Synch_Options::USE_REACTOR])
531 ACE_Time_Value const handler_timeout (handler->timeout ());
532 if (ACE_Reactor::instance ()->schedule_timer (handler,
534 handler_timeout) == -1)
535 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
536 ACE_TEXT ("schedule_timer")), -1);
538 else
539 // Failures on synchronous connects are reported as errors
540 // so that the caller can decide how to proceed.
541 return -1;
543 else
545 handler->state (ACE_TS_Clerk_Handler::CONNECTING);
546 ACE_DEBUG ((LM_DEBUG,
547 ACE_TEXT ("(%t) in the process of connecting %s to %s\n"),
548 synch_options[ACE_Synch_Options::USE_REACTOR] ?
549 ACE_TEXT ("asynchronously") : ACE_TEXT ("synchronously"),
550 buf));
553 else
555 handler->state (ACE_TS_Clerk_Handler::ESTABLISHED);
556 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) connected to %s on %d\n"),
557 buf, handler->get_handle ()));
559 return 0;
563 ACE_TS_Clerk_Processor::parse_args (int argc, ACE_TCHAR *argv[])
565 ACE_TRACE ("ACE_TS_Clerk_Processor::parse_args");
566 ACE_INET_Addr server_addr;
567 ACE_TS_Clerk_Handler *handler;
569 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:t:p:b"), 0);
571 for (int c; (c = get_opt ()) != -1; )
573 switch (c)
575 case 'h':
576 // Get the hostname:port and create an ADDR
577 server_addr.set (get_opt.opt_arg ());
579 // Create a new handler
580 ACE_NEW_RETURN (handler,
581 ACE_TS_Clerk_Handler (this, server_addr),
582 -1);
584 // Cache the handler
585 this->handler_set_.insert (handler);
586 break;
587 case 't':
588 // Get the timeout value
589 this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ());
590 break;
591 case 'p':
592 // Get the poolname
593 ACE_OS::strncpy (this->poolname_,
594 get_opt.opt_arg (),
595 sizeof this->poolname_ / sizeof (ACE_TCHAR));
596 break;
597 case 'b':
598 // Blocking semantics
599 this->blocking_semantics_ = 1;
600 break;
601 default:
602 ACE_ERROR_RETURN ((LM_ERROR,
603 ACE_TEXT ("%n:\n[-h hostname:port] [-t timeout] [-p poolname]\n")),
604 -1);
607 return 0;
611 ACE_TS_Clerk_Processor::suspend ()
613 ACE_TRACE ("ACE_TS_Clerk_Processor::suspend");
614 return 0;
618 ACE_TS_Clerk_Processor::resume ()
620 ACE_TRACE ("ACE_TS_Clerk_Processor::resume");
621 return 0;
624 // The following is a "Factory" used by the ACE_Service_Config and
625 // svc.conf file to dynamically initialize the state of the TS_Clerk.
627 ACE_SVC_FACTORY_DEFINE (ACE_TS_Clerk_Processor)