Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / ACE / apps / Gateway / Peer / Peer.cpp
blobd40348c792fb8429c699c870214e8c52942a2369
1 #define ACE_BUILD_SVC_DLL
3 #include "ace/OS_NS_stdio.h"
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_unistd.h"
6 #include "ace/Signal.h"
7 #include "Peer.h"
9 Peer_Handler::Peer_Handler ()
10 : connection_id_ (-1), // Maybe it's better than 0.
11 msg_frag_ (0),
12 total_bytes_ (0)
14 // Set the high water mark of the <ACE_Message_Queue>. This is used
15 // to exert flow control.
16 this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
17 first_time_ = 1; // It will be first time to open Peer_Handler.
20 // Upcall from the <ACE_Acceptor::handle_input> that turns control
21 // over to our application-specific Gateway handler.
23 int
24 Peer_Handler::open (void *a)
26 ACE_DEBUG ((LM_DEBUG,
27 ACE_TEXT ("handle = %d\n"),
28 this->peer ().get_handle ()));
30 // Call down to the base class to activate and register this handler
31 // with an <ACE_Reactor>.
32 if (this->inherited::open (a) == -1)
33 ACE_ERROR_RETURN ((LM_ERROR,
34 ACE_TEXT ("%p\n"),
35 ACE_TEXT ("open")),
36 -1);
38 if (this->peer ().enable (ACE_NONBLOCK) == -1)
39 ACE_ERROR_RETURN ((LM_ERROR,
40 ACE_TEXT ("%p\n"),
41 ACE_TEXT ("enable")),
42 -1);
44 ACE_Time_Value timeout (Options::instance ()->timeout ());
46 // Schedule the time between disconnects. This should really be a
47 // "tunable" parameter.
48 if (ACE_Reactor::instance ()->schedule_timer
49 (this, 0, timeout) == -1)
50 ACE_ERROR ((LM_ERROR,
51 ACE_TEXT ("%p\n"),
52 ACE_TEXT ("schedule_timer")));
54 // If there are events left in the queue, make sure we enable the
55 // <ACE_Reactor> appropriately to get them sent out.
56 if (this->msg_queue ()->is_empty () == 0
57 && ACE_Reactor::instance ()->schedule_wakeup
58 (this, ACE_Event_Handler::WRITE_MASK) == -1)
59 ACE_ERROR_RETURN ((LM_ERROR,
60 ACE_TEXT ("%p\n"),
61 ACE_TEXT ("schedule_wakeup")),
62 -1);
64 // First action is to wait to be notified of our connection id.
65 this->do_action_ = &Peer_Handler::await_connection_id;
66 return 0;
69 int
70 Peer_Handler::transmit (ACE_Message_Block *mb,
71 size_t n,
72 int event_type)
74 Event *event = (Event *) mb->rd_ptr ();
76 // Initialize the header.
77 new (&event->header_) Event_Header (n,
78 this->connection_id_,
79 event_type,
80 0);
82 // Convert all the fields into network byte order.
83 event->header_.encode ();
85 // Move the write pointer to the end of the event.
86 mb->wr_ptr (sizeof (Event_Header) + n);
88 if (this->put (mb) == -1)
90 if (errno == EWOULDBLOCK) // The queue has filled up!
91 ACE_ERROR ((LM_ERROR,
92 ACE_TEXT ("%p\n"),
93 ACE_TEXT ("gateway is flow controlled, so we're dropping events")));
94 else
95 ACE_ERROR ((LM_ERROR,
96 ACE_TEXT ("%p\n"),
97 ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed.
98 // Caller is responsible for freeing a ACE_Message_Block
99 // if failures occur.
100 mb->release ();
101 return -1;
103 return 0;
106 // Read events from stdin and send them to the gatewayd.
109 Peer_Handler::transmit_stdin ()
111 // If return value is -1, then first_time_ must be reset to 1.
112 int result = 0;
113 if (this->connection_id_ != -1)
115 ACE_Message_Block *mb = 0;
117 ACE_NEW_RETURN (mb,
118 ACE_Message_Block (sizeof (Event)),
119 -1);
121 // Cast the message block payload into an <Event> pointer.
122 Event *event = (Event *) mb->rd_ptr ();
124 ssize_t n = ACE_OS::read (ACE_STDIN,
125 event->data_,
126 sizeof event->data_);
127 switch (n)
129 case 0:
130 ACE_DEBUG ((LM_DEBUG,
131 ACE_TEXT ("stdin closing down\n")));
133 // Take stdin out of the ACE_Reactor so we stop trying to
134 // send events.
135 ACE_Reactor::instance ()->remove_handler
136 (ACE_STDIN,
137 ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);
138 mb->release ();
139 result = 0; //
140 break;
141 /* NOTREACHED */
142 case -1:
143 mb->release ();
144 ACE_ERROR ((LM_ERROR,
145 ACE_TEXT ("%p\n"),
146 ACE_TEXT ("read")));
147 result = 0; //
148 break;
149 /* NOTREACHED */
150 default:
151 // Do not return directly, save the return value.
152 result = this->transmit (mb, n, ROUTING_EVENT);
153 break;
154 /* NOTREACHED */
157 // Do not return at here, but at exit of function.
158 /*return 0;*/
160 else
162 ACE_DEBUG ((LM_DEBUG,
163 ACE_TEXT ("Must transmit over an opened channel.\n")));
164 result = -1; // Save return value at here, return at exit of function.
166 // If transmit error, the stdin-thread will be cancelled, so should
167 // reset first_time_ to 1, which will register_stdin_handler again.
168 if (result == -1)
169 first_time_ = 1;
171 return result;
174 // Perform a non-blocking <put> of event MB. If we are unable to send
175 // the entire event the remainder is re-queue'd at the *front* of the
176 // Message_Queue.
179 Peer_Handler::nonblk_put (ACE_Message_Block *mb)
181 // Try to send the event. If we don't send it all (e.g., due to
182 // flow control), then re-queue the remainder at the head of the
183 // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via
184 // <handle_output>) when it is possible to try again.
186 ssize_t n = this->send (mb);
188 if (n == -1)
189 // -1 is returned only when things have really gone wrong (i.e.,
190 // not when flow control occurs).
191 return -1;
192 else if (errno == EWOULDBLOCK)
194 // We didn't manage to send everything, so requeue.
195 ACE_DEBUG ((LM_DEBUG,
196 ACE_TEXT ("queueing activated on handle %d to connection id %d\n"),
197 this->get_handle (),
198 this->connection_id_));
200 // Re-queue in *front* of the list to preserve order.
201 if (this->msg_queue ()->enqueue_head
202 (mb,
203 (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
204 ACE_ERROR_RETURN ((LM_ERROR,
205 ACE_TEXT ("%p\n"),
206 ACE_TEXT ("enqueue_head")),
207 -1);
208 // Tell ACE_Reactor to call us back when we can send again.
209 if (ACE_Reactor::instance ()->schedule_wakeup
210 (this, ACE_Event_Handler::WRITE_MASK) == -1)
211 ACE_ERROR_RETURN ((LM_ERROR,
212 ACE_TEXT ("%p\n"),
213 ACE_TEXT ("schedule_wakeup")),
214 -1);
215 return 0;
217 else
218 return n;
221 // Finish sending a event when flow control conditions abate. This
222 // method is automatically called by the ACE_Reactor.
225 Peer_Handler::handle_output (ACE_HANDLE)
227 ACE_Message_Block *mb = 0;
229 ACE_DEBUG ((LM_DEBUG,
230 ACE_TEXT ("in handle_output\n")));
232 if (this->msg_queue ()->dequeue_head
233 (mb,
234 (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
236 switch (this->nonblk_put (mb))
238 case 0: // Partial send.
239 ACE_ASSERT (errno == EWOULDBLOCK);
240 // Didn't write everything this time, come back later...
241 break;
242 /* NOTREACHED */
243 case -1:
244 // Caller is responsible for freeing a ACE_Message_Block if
245 // failures occur.
246 mb->release ();
247 ACE_ERROR ((LM_ERROR,
248 ACE_TEXT ("%p\n"),
249 ACE_TEXT ("transmission failure in handle_output")));
250 ACE_FALLTHROUGH;
251 default: // Sent the whole thing.
252 // If we succeed in writing the entire event (or we did not
253 // fail due to EWOULDBLOCK) then check if there are more
254 // events on the <ACE_Message_Queue>. If there aren't, tell
255 // the <ACE_Reactor> not to notify us anymore (at least
256 // until there are new events queued up).
258 if (this->msg_queue ()->is_empty ())
260 ACE_DEBUG ((LM_DEBUG,
261 ACE_TEXT ("queue now empty on handle %d to connection id %d\n"),
262 this->get_handle (),
263 this->connection_id_));
265 if (ACE_Reactor::instance ()->cancel_wakeup
266 (this, ACE_Event_Handler::WRITE_MASK) == -1)
267 ACE_ERROR ((LM_ERROR,
268 ACE_TEXT ("%p\n"),
269 ACE_TEXT ("cancel_wakeup")));
272 return 0;
274 else
275 // If the list is empty there's a bug!
276 ACE_ERROR_RETURN ((LM_ERROR,
277 ACE_TEXT ("%p\n"),
278 ACE_TEXT ("dequeue_head")),
282 // Send an event to a peer (may block if necessary).
285 Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
287 if (this->msg_queue ()->is_empty ())
288 // Try to send the event *without* blocking!
289 return this->nonblk_put (mb);
290 else
291 // If we have queued up events due to flow control then just
292 // enqueue and return.
293 return this->msg_queue ()->enqueue_tail
294 (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
297 // Send an Peer event to gatewayd.
300 Peer_Handler::send (ACE_Message_Block *mb)
302 size_t len = mb->length ();
304 ssize_t n = this->peer ().send (mb->rd_ptr (), len);
306 if (n <= 0)
307 return errno == EWOULDBLOCK ? 0 : n;
308 else if (n < (ssize_t) len)
310 // Re-adjust pointer to skip over the part we did send.
311 mb->rd_ptr (n);
312 this->total_bytes_ += n;
314 else // if (n == length).
316 // The whole event is sent, we can now safely deallocate the
317 // buffer. Note that this should decrement a reference count...
318 this->total_bytes_ += n;
319 mb->release ();
320 errno = 0;
323 ACE_DEBUG ((LM_DEBUG,
324 ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"),
326 this->total_bytes_));
327 return n;
330 // Receive an Event from gatewayd. Handles fragmentation.
333 Peer_Handler::recv (ACE_Message_Block *&mb)
335 if (this->msg_frag_ == 0)
336 // No existing fragment...
337 ACE_NEW_RETURN (this->msg_frag_,
338 ACE_Message_Block (sizeof (Event)),
339 -1);
341 Event *event = (Event *) this->msg_frag_->rd_ptr ();
342 ssize_t header_received = 0;
344 const size_t HEADER_SIZE = sizeof (Event_Header);
345 ssize_t header_bytes_left_to_read =
346 HEADER_SIZE - this->msg_frag_->length ();
348 if (header_bytes_left_to_read > 0)
350 header_received = this->peer ().recv
351 (this->msg_frag_->wr_ptr (),
352 header_bytes_left_to_read);
354 if (header_received == -1 /* error */
355 || header_received == 0 /* EOF */)
357 ACE_ERROR ((LM_ERROR,
358 ACE_TEXT ("%p\n"),
359 ACE_TEXT ("Recv error during header read")));
360 ACE_DEBUG ((LM_DEBUG,
361 ACE_TEXT ("attempted to read %d bytes\n"),
362 header_bytes_left_to_read));
363 this->msg_frag_ = this->msg_frag_->release ();
364 return header_received;
367 // Bump the write pointer by the amount read.
368 this->msg_frag_->wr_ptr (header_received);
370 // At this point we may or may not have the ENTIRE header.
371 if (this->msg_frag_->length () < HEADER_SIZE)
373 ACE_DEBUG ((LM_DEBUG,
374 ACE_TEXT ("Partial header received: only %d bytes\n"),
375 this->msg_frag_->length ()));
376 // Notify the caller that we didn't get an entire event.
377 errno = EWOULDBLOCK;
378 return -1;
381 // Convert the header into host byte order so that we can access
382 // it directly without having to repeatedly muck with it...
383 event->header_.decode ();
385 if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
387 // This data_ payload is too big!
388 errno = EINVAL;
389 ACE_DEBUG ((LM_DEBUG,
390 ACE_TEXT ("Data payload is too big (%d bytes)\n"),
391 event->header_.len_));
392 return -1;
396 // At this point there is a complete, valid header in Event. Now we
397 // need to get the event payload. Due to incomplete reads this may
398 // not be the first time we've read in a fragment for this message.
399 // We account for this here. Note that the first time in here
400 // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we
401 // do a successful fragment read, we advance <wr_ptr>. Therefore,
402 // by subtracting how much we've already read from the
403 // <event->header_.len_> we complete the
404 // <data_bytes_left_to_read>...
406 ssize_t data_bytes_left_to_read =
407 ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
409 // peer().recv() should not be called when data_bytes_left_to_read is 0.
410 ssize_t data_received = !data_bytes_left_to_read ? 0 :
411 this->peer ().recv (this->msg_frag_->wr_ptr (),
412 data_bytes_left_to_read);
414 // Try to receive the remainder of the event.
416 switch (data_received)
418 case -1:
419 if (errno == EWOULDBLOCK)
420 // This might happen if only the header came through.
421 return -1;
422 ACE_FALLTHROUGH;
423 case 0: // Premature EOF.
424 if (data_bytes_left_to_read)
426 this->msg_frag_ = this->msg_frag_->release ();
427 return 0;
429 ACE_FALLTHROUGH;
430 default:
431 // Set the write pointer at 1 past the end of the event.
432 this->msg_frag_->wr_ptr (data_received);
434 if (data_received != data_bytes_left_to_read)
436 errno = EWOULDBLOCK;
437 // Inform caller that we didn't get the whole event.
438 return -1;
440 else
442 // Set the read pointer to the beginning of the event.
443 this->msg_frag_->rd_ptr (this->msg_frag_->base ());
445 mb = this->msg_frag_;
447 // Reset the pointer to indicate we've got an entire event.
448 this->msg_frag_ = 0;
451 ACE_DEBUG ((LM_DEBUG,
452 ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"),
453 event->header_.connection_id_,
454 event->header_.len_,
455 data_received + header_received));
456 if (Options::instance ()->enabled (Options::VERBOSE))
457 ACE_DEBUG ((LM_DEBUG,
458 ACE_TEXT ("data_ = %*s\n"),
459 event->header_.len_ - 2,
460 event->data_));
461 return data_received + header_received;
465 // Receive various types of input (e.g., Peer event from the gatewayd,
466 // as well as stdio).
469 Peer_Handler::handle_input (ACE_HANDLE sd)
471 ACE_DEBUG ((LM_DEBUG,
472 ACE_TEXT ("in handle_input, sd = %d\n"),
473 sd));
474 if (sd == ACE_STDIN) // Handle event from stdin.
475 return this->transmit_stdin ();
476 else
477 // Perform the appropriate action depending on the state we are
478 // in.
479 return (this->*do_action_) ();
482 // Action that receives our connection id from the Gateway.
485 Peer_Handler::await_connection_id ()
487 ssize_t n = this->peer ().recv (&this->connection_id_,
488 sizeof this->connection_id_);
490 if (n != (ssize_t) sizeof this->connection_id_)
492 if (n == 0)
493 ACE_ERROR_RETURN ((LM_ERROR,
494 ACE_TEXT ("gatewayd has closed down unexpectedly\n")),
495 -1);
496 else
497 ACE_ERROR_RETURN ((LM_ERROR,
498 ACE_TEXT ("%p, bytes received on handle %d = %d\n"),
499 ACE_TEXT ("recv"),
500 this->get_handle (),
502 -1);
504 else
506 this->connection_id_ = ntohl (this->connection_id_);
507 ACE_DEBUG ((LM_DEBUG,
508 ACE_TEXT ("assigned connection id %d\n"),
509 this->connection_id_));
512 // Subscribe for events if we're a Consumer.
513 if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
514 this->subscribe ();
516 // No need to disconnect by timeout.
517 ACE_Reactor::instance ()->cancel_timer(this);
518 // Transition to the action that waits for Peer events.
519 this->do_action_ = &Peer_Handler::await_events;
521 // Reset standard input.
522 ACE_OS::rewind (stdin);
524 // Call register_stdin_handler only once, until the stdin-thread
525 // closed which caused by transmit_stdin error.
526 if (first_time_)
528 // Register this handler to receive test events on stdin.
529 if (ACE_Event_Handler::register_stdin_handler
530 (this,
531 ACE_Reactor::instance (),
532 ACE_Thread_Manager::instance ()) == -1)
533 ACE_ERROR_RETURN ((LM_ERROR,
534 ACE_TEXT ("(%t) %p\n"),
535 ACE_TEXT ("register_stdin_handler")),
536 -1);
538 // Next time in await_connection_id(), I'll don't call
539 // register_stdin_handler().
540 first_time_ = 0;
542 return 0;
546 Peer_Handler::subscribe ()
548 ACE_Message_Block *mb = 0;
550 ACE_NEW_RETURN (mb,
551 ACE_Message_Block (sizeof (Event)),
552 -1);
554 Subscription *subscription =
555 (Subscription *) ((Event *) mb->rd_ptr ())->data_;
556 subscription->connection_id_ =
557 Options::instance ()->connection_id ();
559 return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
562 // Action that receives events from the Gateway.
565 Peer_Handler::await_events ()
567 ACE_Message_Block *mb = 0;
569 ssize_t n = this->recv (mb);
571 switch (n)
573 case 0:
574 ACE_ERROR_RETURN ((LM_ERROR,
575 ACE_TEXT ("gatewayd has closed down\n")),
576 -1);
577 /* NOTREACHED */
578 case -1:
579 if (errno == EWOULDBLOCK)
580 // A short-read, we'll come back and finish it up later on!
581 return 0;
582 else
583 ACE_ERROR_RETURN ((LM_ERROR,
584 ACE_TEXT ("%p\n"),
585 ACE_TEXT ("recv")),
586 -1);
587 /* NOTREACHED */
588 default:
590 // We got a valid event, so let's process it now! At the
591 // moment, we just print out the event contents...
593 Event *event = (Event *) mb->rd_ptr ();
594 this->total_bytes_ += mb->length ();
596 ACE_DEBUG ((LM_DEBUG,
597 ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"),
598 event->header_.connection_id_,
599 event->header_.len_,
600 this->total_bytes_));
601 if (Options::instance ()->enabled (Options::VERBOSE))
602 ACE_DEBUG ((LM_DEBUG,
603 ACE_TEXT ("data_ = %*s\n"),
604 event->header_.len_ - 2,
605 event->data_));
606 mb->release ();
607 return 0;
612 // Periodically send events via ACE_Reactor timer mechanism.
615 Peer_Handler::handle_timeout (const ACE_Time_Value &,
616 const void *)
618 // Shut down the handler.
619 return this->handle_close ();
622 Peer_Handler::~Peer_Handler ()
624 // Shut down the handler.
625 this->handle_close ();
628 // Handle shutdown of the Peer object.
631 Peer_Handler::handle_close (ACE_HANDLE,
632 ACE_Reactor_Mask)
634 if (this->get_handle () != ACE_INVALID_HANDLE)
636 ACE_DEBUG ((LM_DEBUG,
637 ACE_TEXT ("shutting down Peer on handle %d\n"),
638 this->get_handle ()));
640 ACE_Reactor_Mask mask =
641 ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK;
643 // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>
644 // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL>
645 // instructs the ACE_Reactor *not* to call <handle_close>, which
646 // would otherwise lead to infinite recursion!).
647 ACE_Reactor::instance ()->remove_handler
648 (ACE_STDIN, mask);
650 // Deregister this handler with the ACE_Reactor.
651 if (ACE_Reactor::instance ()->remove_handler
652 (this, mask) == -1)
653 ACE_ERROR_RETURN ((LM_ERROR,
654 ACE_TEXT ("handle = %d: %p\n"),
655 this->get_handle (),
656 ACE_TEXT ("remove_handler")),
657 -1);
658 // Close down the peer.
659 this->peer ().close ();
661 return 0;
665 Peer_Acceptor::start (u_short port)
667 // This object only gets allocated once and is just recycled
668 // forever.
669 ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1);
671 this->addr_.set (port);
673 ACE_DEBUG ((LM_DEBUG,
674 ACE_TEXT ("opening acceptor at port %d\n"),
675 port));
677 // Call down to the <Acceptor::open> method.
678 if (this->inherited::open (this->addr_) == -1)
679 ACE_ERROR_RETURN ((LM_ERROR,
680 ACE_TEXT ("%p\n"),
681 ACE_TEXT ("open")),
682 -1);
683 else if (this->acceptor ().get_local_addr (this->addr_) == -1)
684 ACE_ERROR_RETURN ((LM_ERROR,
685 ACE_TEXT ("%p\n"),
686 ACE_TEXT ("get_local_addr")),
687 -1);
688 else
689 ACE_DEBUG ((LM_DEBUG,
690 ACE_TEXT ("accepting at port %d\n"),
691 this->addr_.get_port_number ()));
692 return 0;
695 Peer_Acceptor::Peer_Acceptor ()
696 : peer_handler_ (0)
701 Peer_Acceptor::close ()
703 // Will trigger a delete.
704 if (this->peer_handler_ != 0)
705 this->peer_handler_->destroy ();
707 // Close down the base class.
708 return this->inherited::close ();
711 // Note how this method just passes back the pre-allocated
712 // <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new
713 // one each time!
716 Peer_Acceptor::make_svc_handler (Peer_Handler *&sh)
718 sh = this->peer_handler_;
719 return 0;
723 Peer_Connector::open_connector (Peer_Handler *&peer_handler,
724 u_short port)
726 // This object only gets allocated once and is just recycled
727 // forever.
728 ACE_NEW_RETURN (peer_handler,
729 Peer_Handler,
730 -1);
732 ACE_INET_Addr addr (port,
733 Options::instance ()->connector_host ());
735 ACE_DEBUG ((LM_DEBUG,
736 ACE_TEXT ("connecting to %s:%d\n"),
737 addr.get_host_name (),
738 addr.get_port_number ()));
740 if (this->connect (peer_handler, addr) == -1)
741 ACE_ERROR_RETURN ((LM_ERROR,
742 ACE_TEXT ("%p\n"),
743 ACE_TEXT ("connect")),
744 -1);
745 else
746 ACE_DEBUG ((LM_DEBUG,
747 ACE_TEXT ("connected to %C:%d\n"),
748 addr.get_host_name (),
749 addr.get_port_number ()));
750 return 0;
754 Peer_Connector::open (ACE_Reactor *, int)
756 this->supplier_peer_handler_ = 0;
757 this->consumer_peer_handler_ = 0;
759 if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
760 && this->open_connector (this->supplier_peer_handler_,
761 Options::instance ()->supplier_connector_port ()) == -1)
762 return -1;
764 if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)
765 && this->open_connector (this->consumer_peer_handler_,
766 Options::instance ()->consumer_connector_port ()) == -1)
767 return -1;
769 return 0;
773 Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *)
775 if (signum != SIGPIPE)
777 // Shut down the main event loop.
778 ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit?
779 ACE_Reactor::instance ()->end_reactor_event_loop();
782 return 0;
785 // Returns information on the currently active service.
788 Peer_Factory::info (ACE_TCHAR **strp, size_t length) const
790 ACE_TCHAR buf[BUFSIZ];
791 ACE_TCHAR consumer_addr_str[BUFSIZ];
792 ACE_TCHAR supplier_addr_str[BUFSIZ];
794 ACE_INET_Addr addr;
796 if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1)
797 return -1;
798 else if (addr.addr_to_string (consumer_addr_str,
799 sizeof addr) == -1)
800 return -1;
801 else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1)
802 return -1;
803 else if (addr.addr_to_string (supplier_addr_str,
804 sizeof addr) == -1)
805 return -1;
807 ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:"));
808 ACE_OS::strcat (buf, consumer_addr_str);
809 ACE_OS::strcat (buf, ACE_TEXT ("|S:"));
810 ACE_OS::strcat (buf, supplier_addr_str);
811 ACE_OS::strcat
812 (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n"));
814 if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
815 return -1;
816 else
817 ACE_OS::strncpy (*strp, buf, length);
818 return ACE_OS::strlen (buf);
821 // Hook called by the explicit dynamic linking facility to terminate
822 // the peer.
825 Peer_Factory::fini ()
827 this->consumer_acceptor_.close ();
828 this->supplier_acceptor_.close ();
829 return 0;
832 // Hook called by the explicit dynamic linking facility to initialize
833 // the peer.
836 Peer_Factory::init (int argc, ACE_TCHAR *argv[])
838 Options::instance ()->parse_args (argc, argv);
840 ACE_Sig_Set sig_set;
842 sig_set.sig_add (SIGINT);
843 sig_set.sig_add (SIGQUIT);
844 sig_set.sig_add (SIGPIPE);
846 // Register ourselves to receive signals so we can shut down
847 // gracefully.
849 if (ACE_Reactor::instance ()->register_handler (sig_set,
850 this) == -1)
851 ACE_ERROR_RETURN ((LM_ERROR,
852 ACE_TEXT ("%p\n"),
853 ACE_TEXT ("register_handler")),
854 -1);
856 if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
857 && this->supplier_acceptor_.start
858 (Options::instance ()->supplier_acceptor_port ()) == -1)
859 ACE_ERROR_RETURN ((LM_ERROR,
860 ACE_TEXT ("%p\n"),
861 ACE_TEXT ("Acceptor::open")),
862 -1);
863 else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
864 && this->consumer_acceptor_.start
865 (Options::instance ()->consumer_acceptor_port ()) == -1)
866 ACE_ERROR_RETURN ((LM_ERROR,
867 ACE_TEXT ("%p\n"),
868 ACE_TEXT ("Acceptor::open")),
869 -1);
870 else if (this->connector_.open () == -1)
871 ACE_ERROR_RETURN ((LM_ERROR,
872 ACE_TEXT ("%p\n"),
873 ACE_TEXT ("Connector::open")),
874 -1);
875 return 0;
878 // The following is a "Factory" used by the <ACE_Service_Config> and
879 // svc.conf file to dynamically initialize the <Peer_Acceptor> and
880 // <Peer_Connector>.
882 ACE_SVC_FACTORY_DEFINE (Peer_Factory)