Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / ASX / UPIPE_Event_Server / Peer_Router.cpp
blob4014c96c53dd7c6017f9468ee7c7b1270701bf7e
1 #if !defined (_PEER_ROUTER_C)
3 #define _PEER_ROUTER_C
5 #include "ace/Get_Opt.h"
6 #include "ace/Service_Config.h"
8 #include "Peer_Router.h"
9 #include "Options.h"
12 #if defined (ACE_HAS_THREADS)
14 // Define some short-hand macros to deal with long templates
15 // names...
17 #define PH PEER_HANDLER
18 #define PA PEER_ACCEPTOR
19 #define PAD PEER_ADDR
20 #define PK PEER_KEY
21 #define PM PEER_MAP
23 template <class PH, class PK> int
24 Acceptor_Factory<PH, PK>::init (int argc, ACE_TCHAR *argv[])
26 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("df:"), 0);
27 ACE_UPIPE_Addr addr;
29 for (int c; (c = get_opt ()) != -1; )
30 switch (c)
32 case 'f':
33 addr.set (get_opt.opt_arg ());
34 break;
35 case 'd':
36 break;
37 default:
38 break;
41 if (this->open (addr, ACE_Reactor::instance ()) == -1)
42 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1);
43 return 0;
46 template <class PH, class PK>
47 Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
48 : pr_ (pr)
52 template <class PH, class PK> Peer_Router<PH, PK> *
53 Acceptor_Factory<PH, PK>::router ()
55 return this->pr_;
58 template <class ROUTER, class KEY>
59 Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
60 : ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH> (tm)
64 template <class ROUTER, class KEY> int
65 Peer_Handler<ROUTER, KEY>::svc ()
67 // Just a try !! we're just reading from our ACE_Message_Queue.
68 ACE_Message_Block *db, *hb;
69 int n;
70 // do an endless loop
71 for (;;)
73 db = new ACE_Message_Block (BUFSIZ);
74 hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
76 if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
77 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
78 ACE_TEXT ("recv failed")), -1);
79 else if (n == 0) // Client has closed down the connection.
81 if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
82 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
83 ACE_TEXT ("unbind failed")), -1);
84 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) shutting down\n")));
85 return -1; // We do not need to be deregistered by reactor
86 // as we were not registered at all
88 else // Transform incoming buffer into a Message and pass downstream.
90 db->wr_ptr (n);
91 *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment.
92 hb->wr_ptr (sizeof (long));
93 if (this->router_task_->reply (hb) == -1)
95 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Peer_Handler.svc : router_task->reply failed\n")));
96 return -1;
99 // return this->router_task_->reply (hb) == -1 ? -1 : 0;
102 ACE_NOTREACHED(return 0);
105 template <class ROUTER, class KEY> int
106 Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
108 return this->peer ().send_n (mb->rd_ptr (), mb->length ());
111 // Create a new handler and point its ROUTER_TASK_ data member to the
112 // corresponding router. Note that this router is extracted out of
113 // the Acceptor_Factory * that is passed in via the
114 // ACE_Acceptor::handle_input() method.
116 template <class ROUTER, class KEY> int
117 Peer_Handler<ROUTER, KEY>::open (void *a)
119 ACE_TCHAR buf[BUFSIZ], *p = buf;
121 if (this->router_task_->info (&p, sizeof buf) != -1)
122 ACE_DEBUG ((LM_DEBUG,
123 ACE_TEXT ("(%t) creating handler for %s, fd = %d, this = %@\n"),
124 buf, this->get_handle (), a));
125 else
126 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("info")), -1);
128 if ( this->activate (options.t_flags ()) == -1)
129 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
130 ACE_TEXT ("activation of thread failed")), -1);
131 else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
132 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
133 ACE_TEXT ("bind_peer")), -1);
134 return 0;
137 // Receive a message from a supplier..
139 template <class ROUTER, class KEY> int
140 Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
142 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) input arrived on sd %d\n"), h));
143 // ACE_Reactor::instance ()->remove_handler(h,
144 // ACE_Event_Handler::ALL_EVENTS_MASK
145 // |ACE_Event_Handler::DONT_CALL);
146 // this method should be called only if the peer shuts down
147 // so we deactivate our ACE_Message_Queue to awake our svc thread
149 return 0;
151 #if 0
152 ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
153 ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db);
154 int n;
156 if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
157 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("recv failed")), -1);
158 else if (n == 0) // Client has closed down the connection.
160 if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
161 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
162 ACE_TEXT ("unbind failed")), -1);
163 ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
164 return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
166 else // Transform incoming buffer into a Message and pass downstream.
168 db->wr_ptr (n);
169 *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment.
170 hb->wr_ptr (sizeof (long));
171 return this->router_task_->reply (hb) == -1 ? -1 : 0;
173 #endif
176 template <class PH, class PK>
177 Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
178 : ACE_Task<ACE_MT_SYNCH> (tm)
182 template <class PH, class PK> int
183 Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
185 ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_;
186 int bytes = 0;
187 int iterations = 0;
188 ACE_Message_Block *data_block = mb->cont ();
189 for (ACE_Map_Entry<PK, PH *> *ss = 0;
190 map_iter.next (ss) != 0;
191 map_iter.advance ())
193 if (options.debug ())
194 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) sending to peer via sd %d\n"),
195 ss->ext_id_));
197 iterations++;
198 bytes += ss->int_id_->put (data_block);
201 mb->release ();
202 return bytes == 0 ? 0 : bytes / iterations;
205 template <class PH, class PK>
206 Peer_Router<PH, PK>::~Peer_Router ()
210 template <class PH, class PK> int
211 Peer_Router<PH, PK>::fini ()
213 delete this->acceptor_;
214 return 0;
217 template <class PH, class PK> int
218 Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
220 ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
221 ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
223 switch (command = ioc->cmd ())
225 case ACE_IO_Cntl_Msg::SET_LWM:
226 case ACE_IO_Cntl_Msg::SET_HWM:
227 this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
228 break;
229 default:
230 return -1;
232 return 0;
235 template <class PH, class PK> int
236 Peer_Router<PH, PK>::unbind_peer (PK key)
238 return this->peer_map_.unbind (key);
241 template <class PH, class PK> int
242 Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
244 PH *peer_handler = (PH *) ph;
245 return this->peer_map_.bind (key, peer_handler);
248 template <class PH, class PK> int
249 Peer_Router<PH, PK>::init (int argc, ACE_TCHAR *argv[])
251 this->acceptor_ = new Acceptor_Factory <PH, PK> (this);
253 if (this->acceptor_->init (argc, argv) == -1
254 || this->peer_map_.open () == -1)
255 return -1;
256 else
258 ACE_UPIPE_Addr addr;
259 ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor ();
261 if (pa.get_local_addr (addr) != -1)
262 ACE_DEBUG ((LM_DEBUG,
263 ACE_TEXT ("(%t) initializing %s, file = %s, fd = %d, this = %@\n"),
264 this->name (), addr.get_path_name (), pa.get_handle (), this));
265 else
266 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
267 ACE_TEXT ("get_local_addr")), -1);
269 return 0;
272 #undef PH
273 #undef PA
274 #undef PAD
275 #undef PK
276 #undef PM
277 #endif /* ACE_HAS_THREADS */
278 #endif /* _PEER_ROUTER_C */