Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / APG / Threads / Message_Queue.cpp
blob9adc32104bd0fa3f1b48b7c23438390de9072377
1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/SOCK_Acceptor.h"
5 #include "ace/Acceptor.h"
6 #include "Message_Receiver.h"
8 // Listing 5 code/ch12
9 int
10 HA_CommandHandler::svc ()
12 while(1)
14 ACE_Message_Block *mb = 0;
15 if (this->getq (mb) == -1)
16 break;
17 if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
19 mb->release ();
20 break;
22 else
24 // Get header pointer, then move past header to payload.
25 DeviceCommandHeader *dch
26 = (DeviceCommandHeader*)mb->rd_ptr ();
27 mb->rd_ptr (sizeof (DeviceCommandHeader));
28 ACE_DEBUG ((LM_DEBUG,
29 ACE_TEXT ("Message for device #%d with ")
30 ACE_TEXT ("command payload of:\n%s"),
31 dch->deviceId_, mb->rd_ptr ()));
32 this->rep_.update_device (dch->deviceId_,
33 mb->rd_ptr ());
34 mb->release ();
38 ACE_Reactor::instance ()->end_reactor_event_loop ();
40 return 0;
42 // Listing 5
44 // Listing 4 code/ch12
45 ACE_Message_Block *
46 Message_Receiver::shut_down_message ()
48 ACE_Message_Block *mb = 0;
49 ACE_NEW_RETURN
50 (mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), 0);
51 return mb;
53 // Listing 4
55 int
56 Message_Receiver::read_header (DeviceCommandHeader *dch)
58 ssize_t result =
59 this->peer ().recv_n (dch, sizeof (DeviceCommandHeader));
60 if (result <= 0)
61 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
62 ACE_TEXT ("Recieve Failure")),
63 -1);
64 return 0;
66 // Listing 3 code/ch12
67 int
68 Message_Receiver::copy_payload (ACE_Message_Block *mb,
69 int payload_length)
71 ssize_t result =
72 this->peer ().recv_n (mb->wr_ptr (), payload_length);
74 if (result <= 0)
76 mb->release ();
77 return -1;
80 mb->wr_ptr (payload_length);
81 return 0;
83 // Listing 3
84 // Listing 2 code/ch12
85 int
86 Message_Receiver::handle_input (ACE_HANDLE)
88 DeviceCommandHeader dch;
89 if (this->read_header (&dch) < 0)
90 return -1;
92 if (dch.deviceId_ < 0)
94 // Handle shutdown.
95 this->handler_->putq (shut_down_message ());
96 return -1;
99 ACE_Message_Block *mb = 0;
100 ACE_NEW_RETURN
101 (mb, ACE_Message_Block (dch.length_ + sizeof dch), -1);
102 // Copy the header.
103 mb->copy ((const char*)&dch, sizeof dch);
104 // Copy the payload.
105 if (this->copy_payload (mb, dch.length_) < 0)
106 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
107 ACE_TEXT ("Recieve Failure")), -1);
108 // Pass it off to the handler thread.
109 this->handler_->putq (mb);
110 return 0;
112 // Listing 2
114 static void report_usage (int argc, ACE_TCHAR *argv[])
116 if (argc < 2)
118 ACE_DEBUG ((LM_ERROR, ACE_TEXT ("%s port\n"), argv[1]));
119 ACE_OS::exit (-1);
124 class Acceptor : public ACE_Acceptor<Message_Receiver, ACE_SOCK_ACCEPTOR>
126 public:
127 Acceptor(HA_CommandHandler *handler) : handler_(handler)
130 protected:
131 virtual int make_svc_handler (Message_Receiver *&mr)
133 ACE_NEW_RETURN (mr, Message_Receiver (handler_), -1);
134 return 0;
137 private:
138 HA_CommandHandler *handler_;
141 int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
143 report_usage (argc, argv);
145 u_short port = ACE_OS::atoi (argv[1]);
147 HA_Device_Repository rep;
148 HA_CommandHandler handler (rep);
149 ACE_ASSERT(handler.activate()==0);
150 //start up the handler.
152 Acceptor acceptor (&handler);
153 ACE_INET_Addr addr (port);
154 if (acceptor.open (addr) == -1)
155 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
156 ACE_TEXT ("Failed to open connection")), -1);
158 ACE_Reactor::instance()->run_reactor_event_loop ();
159 //run the reactive event loop
161 handler.wait ();
162 //reap the handler before exiting.
164 return 0;
167 #else
168 #include "ace/OS_main.h"
169 #include "ace/OS_NS_stdio.h"
171 int ACE_TMAIN (int, ACE_TCHAR *[])
173 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
174 return 0;
177 #endif /* ACE_HAS_THREADS */