1 #define ACE_BUILD_SVC_DLL
3 #include "ace/OS_NS_stdio.h"
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_unistd.h"
6 #include "ace/Signal.h"
9 Peer_Handler::Peer_Handler ()
10 : connection_id_ (-1), // Maybe it's better than 0.
14 // Set the high water mark of the <ACE_Message_Queue>. This is used
15 // to exert flow control.
16 this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
17 first_time_
= 1; // It will be first time to open Peer_Handler.
20 // Upcall from the <ACE_Acceptor::handle_input> that turns control
21 // over to our application-specific Gateway handler.
24 Peer_Handler::open (void *a
)
27 ACE_TEXT ("handle = %d\n"),
28 this->peer ().get_handle ()));
30 // Call down to the base class to activate and register this handler
31 // with an <ACE_Reactor>.
32 if (this->inherited::open (a
) == -1)
33 ACE_ERROR_RETURN ((LM_ERROR
,
38 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
39 ACE_ERROR_RETURN ((LM_ERROR
,
44 ACE_Time_Value
timeout (Options::instance ()->timeout ());
46 // Schedule the time between disconnects. This should really be a
47 // "tunable" parameter.
48 if (ACE_Reactor::instance ()->schedule_timer
49 (this, 0, timeout
) == -1)
52 ACE_TEXT ("schedule_timer")));
54 // If there are events left in the queue, make sure we enable the
55 // <ACE_Reactor> appropriately to get them sent out.
56 if (this->msg_queue ()->is_empty () == 0
57 && ACE_Reactor::instance ()->schedule_wakeup
58 (this, ACE_Event_Handler::WRITE_MASK
) == -1)
59 ACE_ERROR_RETURN ((LM_ERROR
,
61 ACE_TEXT ("schedule_wakeup")),
64 // First action is to wait to be notified of our connection id.
65 this->do_action_
= &Peer_Handler::await_connection_id
;
70 Peer_Handler::transmit (ACE_Message_Block
*mb
,
74 Event
*event
= (Event
*) mb
->rd_ptr ();
76 // Initialize the header.
77 new (&event
->header_
) Event_Header (n
,
82 // Convert all the fields into network byte order.
83 event
->header_
.encode ();
85 // Move the write pointer to the end of the event.
86 mb
->wr_ptr (sizeof (Event_Header
) + n
);
88 if (this->put (mb
) == -1)
90 if (errno
== EWOULDBLOCK
) // The queue has filled up!
93 ACE_TEXT ("gateway is flow controlled, so we're dropping events")));
97 ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed.
98 // Caller is responsible for freeing a ACE_Message_Block
106 // Read events from stdin and send them to the gatewayd.
109 Peer_Handler::transmit_stdin ()
111 // If return value is -1, then first_time_ must be reset to 1.
113 if (this->connection_id_
!= -1)
115 ACE_Message_Block
*mb
= 0;
118 ACE_Message_Block (sizeof (Event
)),
121 // Cast the message block payload into an <Event> pointer.
122 Event
*event
= (Event
*) mb
->rd_ptr ();
124 ssize_t n
= ACE_OS::read (ACE_STDIN
,
126 sizeof event
->data_
);
130 ACE_DEBUG ((LM_DEBUG
,
131 ACE_TEXT ("stdin closing down\n")));
133 // Take stdin out of the ACE_Reactor so we stop trying to
135 ACE_Reactor::instance ()->remove_handler
137 ACE_Event_Handler::DONT_CALL
| ACE_Event_Handler::READ_MASK
);
144 ACE_ERROR ((LM_ERROR
,
151 // Do not return directly, save the return value.
152 result
= this->transmit (mb
, n
, ROUTING_EVENT
);
157 // Do not return at here, but at exit of function.
162 ACE_DEBUG ((LM_DEBUG
,
163 ACE_TEXT ("Must transmit over an opened channel.\n")));
164 result
= -1; // Save return value at here, return at exit of function.
166 // If transmit error, the stdin-thread will be cancelled, so should
167 // reset first_time_ to 1, which will register_stdin_handler again.
174 // Perform a non-blocking <put> of event MB. If we are unable to send
175 // the entire event the remainder is re-queue'd at the *front* of the
179 Peer_Handler::nonblk_put (ACE_Message_Block
*mb
)
181 // Try to send the event. If we don't send it all (e.g., due to
182 // flow control), then re-queue the remainder at the head of the
183 // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via
184 // <handle_output>) when it is possible to try again.
186 ssize_t n
= this->send (mb
);
189 // -1 is returned only when things have really gone wrong (i.e.,
190 // not when flow control occurs).
192 else if (errno
== EWOULDBLOCK
)
194 // We didn't manage to send everything, so requeue.
195 ACE_DEBUG ((LM_DEBUG
,
196 ACE_TEXT ("queueing activated on handle %d to connection id %d\n"),
198 this->connection_id_
));
200 // Re-queue in *front* of the list to preserve order.
201 if (this->msg_queue ()->enqueue_head
203 (ACE_Time_Value
*) &ACE_Time_Value::zero
) == -1)
204 ACE_ERROR_RETURN ((LM_ERROR
,
206 ACE_TEXT ("enqueue_head")),
208 // Tell ACE_Reactor to call us back when we can send again.
209 if (ACE_Reactor::instance ()->schedule_wakeup
210 (this, ACE_Event_Handler::WRITE_MASK
) == -1)
211 ACE_ERROR_RETURN ((LM_ERROR
,
213 ACE_TEXT ("schedule_wakeup")),
221 // Finish sending a event when flow control conditions abate. This
222 // method is automatically called by the ACE_Reactor.
225 Peer_Handler::handle_output (ACE_HANDLE
)
227 ACE_Message_Block
*mb
= 0;
229 ACE_DEBUG ((LM_DEBUG
,
230 ACE_TEXT ("in handle_output\n")));
232 if (this->msg_queue ()->dequeue_head
234 (ACE_Time_Value
*) &ACE_Time_Value::zero
) != -1)
236 switch (this->nonblk_put (mb
))
238 case 0: // Partial send.
239 ACE_ASSERT (errno
== EWOULDBLOCK
);
240 // Didn't write everything this time, come back later...
244 // Caller is responsible for freeing a ACE_Message_Block if
247 ACE_ERROR ((LM_ERROR
,
249 ACE_TEXT ("transmission failure in handle_output")));
251 default: // Sent the whole thing.
252 // If we succeed in writing the entire event (or we did not
253 // fail due to EWOULDBLOCK) then check if there are more
254 // events on the <ACE_Message_Queue>. If there aren't, tell
255 // the <ACE_Reactor> not to notify us anymore (at least
256 // until there are new events queued up).
258 if (this->msg_queue ()->is_empty ())
260 ACE_DEBUG ((LM_DEBUG
,
261 ACE_TEXT ("queue now empty on handle %d to connection id %d\n"),
263 this->connection_id_
));
265 if (ACE_Reactor::instance ()->cancel_wakeup
266 (this, ACE_Event_Handler::WRITE_MASK
) == -1)
267 ACE_ERROR ((LM_ERROR
,
269 ACE_TEXT ("cancel_wakeup")));
275 // If the list is empty there's a bug!
276 ACE_ERROR_RETURN ((LM_ERROR
,
278 ACE_TEXT ("dequeue_head")),
282 // Send an event to a peer (may block if necessary).
285 Peer_Handler::put (ACE_Message_Block
*mb
, ACE_Time_Value
*)
287 if (this->msg_queue ()->is_empty ())
288 // Try to send the event *without* blocking!
289 return this->nonblk_put (mb
);
291 // If we have queued up events due to flow control then just
292 // enqueue and return.
293 return this->msg_queue ()->enqueue_tail
294 (mb
, (ACE_Time_Value
*) &ACE_Time_Value::zero
);
297 // Send an Peer event to gatewayd.
300 Peer_Handler::send (ACE_Message_Block
*mb
)
302 size_t len
= mb
->length ();
304 ssize_t n
= this->peer ().send (mb
->rd_ptr (), len
);
307 return errno
== EWOULDBLOCK
? 0 : n
;
308 else if (n
< (ssize_t
) len
)
310 // Re-adjust pointer to skip over the part we did send.
312 this->total_bytes_
+= n
;
314 else // if (n == length).
316 // The whole event is sent, we can now safely deallocate the
317 // buffer. Note that this should decrement a reference count...
318 this->total_bytes_
+= n
;
323 ACE_DEBUG ((LM_DEBUG
,
324 ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"),
326 this->total_bytes_
));
330 // Receive an Event from gatewayd. Handles fragmentation.
333 Peer_Handler::recv (ACE_Message_Block
*&mb
)
335 if (this->msg_frag_
== 0)
336 // No existing fragment...
337 ACE_NEW_RETURN (this->msg_frag_
,
338 ACE_Message_Block (sizeof (Event
)),
341 Event
*event
= (Event
*) this->msg_frag_
->rd_ptr ();
342 ssize_t header_received
= 0;
344 const size_t HEADER_SIZE
= sizeof (Event_Header
);
345 ssize_t header_bytes_left_to_read
=
346 HEADER_SIZE
- this->msg_frag_
->length ();
348 if (header_bytes_left_to_read
> 0)
350 header_received
= this->peer ().recv
351 (this->msg_frag_
->wr_ptr (),
352 header_bytes_left_to_read
);
354 if (header_received
== -1 /* error */
355 || header_received
== 0 /* EOF */)
357 ACE_ERROR ((LM_ERROR
,
359 ACE_TEXT ("Recv error during header read")));
360 ACE_DEBUG ((LM_DEBUG
,
361 ACE_TEXT ("attempted to read %d bytes\n"),
362 header_bytes_left_to_read
));
363 this->msg_frag_
= this->msg_frag_
->release ();
364 return header_received
;
367 // Bump the write pointer by the amount read.
368 this->msg_frag_
->wr_ptr (header_received
);
370 // At this point we may or may not have the ENTIRE header.
371 if (this->msg_frag_
->length () < HEADER_SIZE
)
373 ACE_DEBUG ((LM_DEBUG
,
374 ACE_TEXT ("Partial header received: only %d bytes\n"),
375 this->msg_frag_
->length ()));
376 // Notify the caller that we didn't get an entire event.
381 // Convert the header into host byte order so that we can access
382 // it directly without having to repeatedly muck with it...
383 event
->header_
.decode ();
385 if (event
->header_
.len_
> ACE_INT32 (sizeof event
->data_
))
387 // This data_ payload is too big!
389 ACE_DEBUG ((LM_DEBUG
,
390 ACE_TEXT ("Data payload is too big (%d bytes)\n"),
391 event
->header_
.len_
));
396 // At this point there is a complete, valid header in Event. Now we
397 // need to get the event payload. Due to incomplete reads this may
398 // not be the first time we've read in a fragment for this message.
399 // We account for this here. Note that the first time in here
400 // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we
401 // do a successful fragment read, we advance <wr_ptr>. Therefore,
402 // by subtracting how much we've already read from the
403 // <event->header_.len_> we complete the
404 // <data_bytes_left_to_read>...
406 ssize_t data_bytes_left_to_read
=
407 ssize_t (event
->header_
.len_
- (msg_frag_
->wr_ptr () - event
->data_
));
409 // peer().recv() should not be called when data_bytes_left_to_read is 0.
410 ssize_t data_received
= !data_bytes_left_to_read
? 0 :
411 this->peer ().recv (this->msg_frag_
->wr_ptr (),
412 data_bytes_left_to_read
);
414 // Try to receive the remainder of the event.
416 switch (data_received
)
419 if (errno
== EWOULDBLOCK
)
420 // This might happen if only the header came through.
423 case 0: // Premature EOF.
424 if (data_bytes_left_to_read
)
426 this->msg_frag_
= this->msg_frag_
->release ();
431 // Set the write pointer at 1 past the end of the event.
432 this->msg_frag_
->wr_ptr (data_received
);
434 if (data_received
!= data_bytes_left_to_read
)
437 // Inform caller that we didn't get the whole event.
442 // Set the read pointer to the beginning of the event.
443 this->msg_frag_
->rd_ptr (this->msg_frag_
->base ());
445 mb
= this->msg_frag_
;
447 // Reset the pointer to indicate we've got an entire event.
451 ACE_DEBUG ((LM_DEBUG
,
452 ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"),
453 event
->header_
.connection_id_
,
455 data_received
+ header_received
));
456 if (Options::instance ()->enabled (Options::VERBOSE
))
457 ACE_DEBUG ((LM_DEBUG
,
458 ACE_TEXT ("data_ = %*s\n"),
459 event
->header_
.len_
- 2,
461 return data_received
+ header_received
;
465 // Receive various types of input (e.g., Peer event from the gatewayd,
466 // as well as stdio).
469 Peer_Handler::handle_input (ACE_HANDLE sd
)
471 ACE_DEBUG ((LM_DEBUG
,
472 ACE_TEXT ("in handle_input, sd = %d\n"),
474 if (sd
== ACE_STDIN
) // Handle event from stdin.
475 return this->transmit_stdin ();
477 // Perform the appropriate action depending on the state we are
479 return (this->*do_action_
) ();
482 // Action that receives our connection id from the Gateway.
485 Peer_Handler::await_connection_id ()
487 ssize_t n
= this->peer ().recv (&this->connection_id_
,
488 sizeof this->connection_id_
);
490 if (n
!= (ssize_t
) sizeof this->connection_id_
)
493 ACE_ERROR_RETURN ((LM_ERROR
,
494 ACE_TEXT ("gatewayd has closed down unexpectedly\n")),
497 ACE_ERROR_RETURN ((LM_ERROR
,
498 ACE_TEXT ("%p, bytes received on handle %d = %d\n"),
506 this->connection_id_
= ntohl (this->connection_id_
);
507 ACE_DEBUG ((LM_DEBUG
,
508 ACE_TEXT ("assigned connection id %d\n"),
509 this->connection_id_
));
512 // Subscribe for events if we're a Consumer.
513 if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR
))
516 // No need to disconnect by timeout.
517 ACE_Reactor::instance ()->cancel_timer(this);
518 // Transition to the action that waits for Peer events.
519 this->do_action_
= &Peer_Handler::await_events
;
521 // Reset standard input.
522 ACE_OS::rewind (stdin
);
524 // Call register_stdin_handler only once, until the stdin-thread
525 // closed which caused by transmit_stdin error.
528 // Register this handler to receive test events on stdin.
529 if (ACE_Event_Handler::register_stdin_handler
531 ACE_Reactor::instance (),
532 ACE_Thread_Manager::instance ()) == -1)
533 ACE_ERROR_RETURN ((LM_ERROR
,
534 ACE_TEXT ("(%t) %p\n"),
535 ACE_TEXT ("register_stdin_handler")),
538 // Next time in await_connection_id(), I'll don't call
539 // register_stdin_handler().
546 Peer_Handler::subscribe ()
548 ACE_Message_Block
*mb
= 0;
551 ACE_Message_Block (sizeof (Event
)),
554 Subscription
*subscription
=
555 (Subscription
*) ((Event
*) mb
->rd_ptr ())->data_
;
556 subscription
->connection_id_
=
557 Options::instance ()->connection_id ();
559 return this->transmit (mb
, sizeof *subscription
, SUBSCRIPTION_EVENT
);
562 // Action that receives events from the Gateway.
565 Peer_Handler::await_events ()
567 ACE_Message_Block
*mb
= 0;
569 ssize_t n
= this->recv (mb
);
574 ACE_ERROR_RETURN ((LM_ERROR
,
575 ACE_TEXT ("gatewayd has closed down\n")),
579 if (errno
== EWOULDBLOCK
)
580 // A short-read, we'll come back and finish it up later on!
583 ACE_ERROR_RETURN ((LM_ERROR
,
590 // We got a valid event, so let's process it now! At the
591 // moment, we just print out the event contents...
593 Event
*event
= (Event
*) mb
->rd_ptr ();
594 this->total_bytes_
+= mb
->length ();
596 ACE_DEBUG ((LM_DEBUG
,
597 ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"),
598 event
->header_
.connection_id_
,
600 this->total_bytes_
));
601 if (Options::instance ()->enabled (Options::VERBOSE
))
602 ACE_DEBUG ((LM_DEBUG
,
603 ACE_TEXT ("data_ = %*s\n"),
604 event
->header_
.len_
- 2,
612 // Periodically send events via ACE_Reactor timer mechanism.
615 Peer_Handler::handle_timeout (const ACE_Time_Value
&,
618 // Shut down the handler.
619 return this->handle_close ();
622 Peer_Handler::~Peer_Handler ()
624 // Shut down the handler.
625 this->handle_close ();
628 // Handle shutdown of the Peer object.
631 Peer_Handler::handle_close (ACE_HANDLE
,
634 if (this->get_handle () != ACE_INVALID_HANDLE
)
636 ACE_DEBUG ((LM_DEBUG
,
637 ACE_TEXT ("shutting down Peer on handle %d\n"),
638 this->get_handle ()));
640 ACE_Reactor_Mask mask
=
641 ACE_Event_Handler::DONT_CALL
| ACE_Event_Handler::READ_MASK
;
643 // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>
644 // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL>
645 // instructs the ACE_Reactor *not* to call <handle_close>, which
646 // would otherwise lead to infinite recursion!).
647 ACE_Reactor::instance ()->remove_handler
650 // Deregister this handler with the ACE_Reactor.
651 if (ACE_Reactor::instance ()->remove_handler
653 ACE_ERROR_RETURN ((LM_ERROR
,
654 ACE_TEXT ("handle = %d: %p\n"),
656 ACE_TEXT ("remove_handler")),
658 // Close down the peer.
659 this->peer ().close ();
665 Peer_Acceptor::start (u_short port
)
667 // This object only gets allocated once and is just recycled
669 ACE_NEW_RETURN (peer_handler_
, Peer_Handler
, -1);
671 this->addr_
.set (port
);
673 ACE_DEBUG ((LM_DEBUG
,
674 ACE_TEXT ("opening acceptor at port %d\n"),
677 // Call down to the <Acceptor::open> method.
678 if (this->inherited::open (this->addr_
) == -1)
679 ACE_ERROR_RETURN ((LM_ERROR
,
683 else if (this->acceptor ().get_local_addr (this->addr_
) == -1)
684 ACE_ERROR_RETURN ((LM_ERROR
,
686 ACE_TEXT ("get_local_addr")),
689 ACE_DEBUG ((LM_DEBUG
,
690 ACE_TEXT ("accepting at port %d\n"),
691 this->addr_
.get_port_number ()));
695 Peer_Acceptor::Peer_Acceptor ()
701 Peer_Acceptor::close ()
703 // Will trigger a delete.
704 if (this->peer_handler_
!= 0)
705 this->peer_handler_
->destroy ();
707 // Close down the base class.
708 return this->inherited::close ();
711 // Note how this method just passes back the pre-allocated
712 // <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new
716 Peer_Acceptor::make_svc_handler (Peer_Handler
*&sh
)
718 sh
= this->peer_handler_
;
723 Peer_Connector::open_connector (Peer_Handler
*&peer_handler
,
726 // This object only gets allocated once and is just recycled
728 ACE_NEW_RETURN (peer_handler
,
732 ACE_INET_Addr
addr (port
,
733 Options::instance ()->connector_host ());
735 ACE_DEBUG ((LM_DEBUG
,
736 ACE_TEXT ("connecting to %s:%d\n"),
737 addr
.get_host_name (),
738 addr
.get_port_number ()));
740 if (this->connect (peer_handler
, addr
) == -1)
741 ACE_ERROR_RETURN ((LM_ERROR
,
743 ACE_TEXT ("connect")),
746 ACE_DEBUG ((LM_DEBUG
,
747 ACE_TEXT ("connected to %C:%d\n"),
748 addr
.get_host_name (),
749 addr
.get_port_number ()));
754 Peer_Connector::open (ACE_Reactor
*, int)
756 this->supplier_peer_handler_
= 0;
757 this->consumer_peer_handler_
= 0;
759 if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR
)
760 && this->open_connector (this->supplier_peer_handler_
,
761 Options::instance ()->supplier_connector_port ()) == -1)
764 if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR
)
765 && this->open_connector (this->consumer_peer_handler_
,
766 Options::instance ()->consumer_connector_port ()) == -1)
773 Peer_Factory::handle_signal (int signum
, siginfo_t
*, ucontext_t
*)
775 if (signum
!= SIGPIPE
)
777 // Shut down the main event loop.
778 ACE_DEBUG((LM_NOTICE
, ACE_TEXT ("Exit case signal\n"))); // Why do I exit?
779 ACE_Reactor::instance ()->end_reactor_event_loop();
785 // Returns information on the currently active service.
788 Peer_Factory::info (ACE_TCHAR
**strp
, size_t length
) const
790 ACE_TCHAR buf
[BUFSIZ
];
791 ACE_TCHAR consumer_addr_str
[BUFSIZ
];
792 ACE_TCHAR supplier_addr_str
[BUFSIZ
];
796 if (this->consumer_acceptor_
.acceptor ().get_local_addr (addr
) == -1)
798 else if (addr
.addr_to_string (consumer_addr_str
,
801 else if (this->supplier_acceptor_
.acceptor ().get_local_addr (addr
) == -1)
803 else if (addr
.addr_to_string (supplier_addr_str
,
807 ACE_OS::strcpy (buf
, ACE_TEXT ("peerd\t C:"));
808 ACE_OS::strcat (buf
, consumer_addr_str
);
809 ACE_OS::strcat (buf
, ACE_TEXT ("|S:"));
810 ACE_OS::strcat (buf
, supplier_addr_str
);
812 (buf
, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n"));
814 if (*strp
== 0 && (*strp
= ACE_OS::strdup (buf
)) == 0)
817 ACE_OS::strncpy (*strp
, buf
, length
);
818 return ACE_OS::strlen (buf
);
821 // Hook called by the explicit dynamic linking facility to terminate
825 Peer_Factory::fini ()
827 this->consumer_acceptor_
.close ();
828 this->supplier_acceptor_
.close ();
832 // Hook called by the explicit dynamic linking facility to initialize
836 Peer_Factory::init (int argc
, ACE_TCHAR
*argv
[])
838 Options::instance ()->parse_args (argc
, argv
);
842 sig_set
.sig_add (SIGINT
);
843 sig_set
.sig_add (SIGQUIT
);
844 sig_set
.sig_add (SIGPIPE
);
846 // Register ourselves to receive signals so we can shut down
849 if (ACE_Reactor::instance ()->register_handler (sig_set
,
851 ACE_ERROR_RETURN ((LM_ERROR
,
853 ACE_TEXT ("register_handler")),
856 if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR
)
857 && this->supplier_acceptor_
.start
858 (Options::instance ()->supplier_acceptor_port ()) == -1)
859 ACE_ERROR_RETURN ((LM_ERROR
,
861 ACE_TEXT ("Acceptor::open")),
863 else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR
)
864 && this->consumer_acceptor_
.start
865 (Options::instance ()->consumer_acceptor_port ()) == -1)
866 ACE_ERROR_RETURN ((LM_ERROR
,
868 ACE_TEXT ("Acceptor::open")),
870 else if (this->connector_
.open () == -1)
871 ACE_ERROR_RETURN ((LM_ERROR
,
873 ACE_TEXT ("Connector::open")),
878 // The following is a "Factory" used by the <ACE_Service_Config> and
879 // svc.conf file to dynamically initialize the <Peer_Acceptor> and
882 ACE_SVC_FACTORY_DEFINE (Peer_Factory
)