1 #if !defined (_PEER_ROUTER_C)
5 #include "ace/Get_Opt.h"
6 #include "ace/Service_Config.h"
8 #include "Peer_Router.h"
12 #if defined (ACE_HAS_THREADS)
14 // Define some short-hand macros to deal with long templates
17 #define PH PEER_HANDLER
18 #define PA PEER_ACCEPTOR
23 template <class PH
, class PK
> int
24 Acceptor_Factory
<PH
, PK
>::init (int argc
, ACE_TCHAR
*argv
[])
26 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("df:"), 0);
29 for (int c
; (c
= get_opt ()) != -1; )
33 addr
.set (get_opt
.opt_arg ());
41 if (this->open (addr
, ACE_Reactor::instance ()) == -1)
42 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1);
46 template <class PH
, class PK
>
47 Acceptor_Factory
<PH
, PK
>::Acceptor_Factory (Peer_Router
<PH
, PK
> *pr
)
52 template <class PH
, class PK
> Peer_Router
<PH
, PK
> *
53 Acceptor_Factory
<PH
, PK
>::router ()
58 template <class ROUTER
, class KEY
>
59 Peer_Handler
<ROUTER
, KEY
>::Peer_Handler (ACE_Thread_Manager
*tm
)
60 : ACE_Svc_Handler
<ACE_UPIPE_STREAM
, ACE_MT_SYNCH
> (tm
)
64 template <class ROUTER
, class KEY
> int
65 Peer_Handler
<ROUTER
, KEY
>::svc ()
67 // Just a try !! we're just reading from our ACE_Message_Queue.
68 ACE_Message_Block
*db
, *hb
;
73 db
= new ACE_Message_Block (BUFSIZ
);
74 hb
= new ACE_Message_Block (sizeof (KEY
), ACE_Message_Block::MB_PROTO
, db
);
76 if ((n
= this->peer ().recv (db
->rd_ptr (), db
->size ())) == -1)
77 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
78 ACE_TEXT ("recv failed")), -1);
79 else if (n
== 0) // Client has closed down the connection.
81 if (this->router_task_
->unbind_peer (this->get_handle ()) == -1)
82 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
83 ACE_TEXT ("unbind failed")), -1);
84 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) shutting down\n")));
85 return -1; // We do not need to be deregistered by reactor
86 // as we were not registered at all
88 else // Transform incoming buffer into a Message and pass downstream.
91 *(ACE_HANDLE
*) hb
->rd_ptr () = this->get_handle (); // structure assignment.
92 hb
->wr_ptr (sizeof (long));
93 if (this->router_task_
->reply (hb
) == -1)
95 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Peer_Handler.svc : router_task->reply failed\n")));
99 // return this->router_task_->reply (hb) == -1 ? -1 : 0;
102 ACE_NOTREACHED(return 0);
105 template <class ROUTER
, class KEY
> int
106 Peer_Handler
<ROUTER
, KEY
>::put (ACE_Message_Block
*mb
, ACE_Time_Value
*)
108 return this->peer ().send_n (mb
->rd_ptr (), mb
->length ());
111 // Create a new handler and point its ROUTER_TASK_ data member to the
112 // corresponding router. Note that this router is extracted out of
113 // the Acceptor_Factory * that is passed in via the
114 // ACE_Acceptor::handle_input() method.
116 template <class ROUTER
, class KEY
> int
117 Peer_Handler
<ROUTER
, KEY
>::open (void *a
)
119 ACE_TCHAR buf
[BUFSIZ
], *p
= buf
;
121 if (this->router_task_
->info (&p
, sizeof buf
) != -1)
122 ACE_DEBUG ((LM_DEBUG
,
123 ACE_TEXT ("(%t) creating handler for %s, fd = %d, this = %@\n"),
124 buf
, this->get_handle (), a
));
126 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("info")), -1);
128 if ( this->activate (options
.t_flags ()) == -1)
129 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
130 ACE_TEXT ("activation of thread failed")), -1);
131 else if (this->router_task_
->bind_peer (this->get_handle (), this) == -1)
132 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
133 ACE_TEXT ("bind_peer")), -1);
137 // Receive a message from a supplier..
139 template <class ROUTER
, class KEY
> int
140 Peer_Handler
<ROUTER
, KEY
>::handle_input (ACE_HANDLE h
)
142 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) input arrived on sd %d\n"), h
));
143 // ACE_Reactor::instance ()->remove_handler(h,
144 // ACE_Event_Handler::ALL_EVENTS_MASK
145 // |ACE_Event_Handler::DONT_CALL);
146 // this method should be called only if the peer shuts down
147 // so we deactivate our ACE_Message_Queue to awake our svc thread
152 ACE_Message_Block
*db
= new ACE_Message_Block (BUFSIZ
);
153 ACE_Message_Block
*hb
= new ACE_Message_Block (sizeof (KEY
), ACE_Message_Block::MB_PROTO
, db
);
156 if ((n
= this->peer ().recv (db
->rd_ptr (), db
->size ())) == -1)
157 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("recv failed")), -1);
158 else if (n
== 0) // Client has closed down the connection.
160 if (this->router_task_
->unbind_peer (this->get_handle ()) == -1)
161 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
162 ACE_TEXT ("unbind failed")), -1);
163 ACE_DEBUG ((LM_DEBUG
, "(%t) shutting down %d\n", h
));
164 return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
166 else // Transform incoming buffer into a Message and pass downstream.
169 *(long *) hb
->rd_ptr () = this->get_handle (); // structure assignment.
170 hb
->wr_ptr (sizeof (long));
171 return this->router_task_
->reply (hb
) == -1 ? -1 : 0;
176 template <class PH
, class PK
>
177 Peer_Router
<PH
, PK
>::Peer_Router (ACE_Thread_Manager
*tm
)
178 : ACE_Task
<ACE_MT_SYNCH
> (tm
)
182 template <class PH
, class PK
> int
183 Peer_Router
<PH
, PK
>::send_peers (ACE_Message_Block
*mb
)
185 ACE_Map_Iterator
<PK
, PH
*, ACE_RW_Mutex
> map_iter
= this->peer_map_
;
188 ACE_Message_Block
*data_block
= mb
->cont ();
189 for (ACE_Map_Entry
<PK
, PH
*> *ss
= 0;
190 map_iter
.next (ss
) != 0;
193 if (options
.debug ())
194 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) sending to peer via sd %d\n"),
198 bytes
+= ss
->int_id_
->put (data_block
);
202 return bytes
== 0 ? 0 : bytes
/ iterations
;
205 template <class PH
, class PK
>
206 Peer_Router
<PH
, PK
>::~Peer_Router ()
210 template <class PH
, class PK
> int
211 Peer_Router
<PH
, PK
>::fini ()
213 delete this->acceptor_
;
217 template <class PH
, class PK
> int
218 Peer_Router
<PH
, PK
>::control (ACE_Message_Block
*mb
)
220 ACE_IO_Cntl_Msg
*ioc
= (ACE_IO_Cntl_Msg
*) mb
->rd_ptr ();
221 ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command
;
223 switch (command
= ioc
->cmd ())
225 case ACE_IO_Cntl_Msg::SET_LWM
:
226 case ACE_IO_Cntl_Msg::SET_HWM
:
227 this->water_marks (command
, *(size_t *) mb
->cont ()->rd_ptr ());
235 template <class PH
, class PK
> int
236 Peer_Router
<PH
, PK
>::unbind_peer (PK key
)
238 return this->peer_map_
.unbind (key
);
241 template <class PH
, class PK
> int
242 Peer_Router
<PH
, PK
>::bind_peer (PK key
, Peer_Handler
<Peer_Router
<PH
, PK
>, PK
> *ph
)
244 PH
*peer_handler
= (PH
*) ph
;
245 return this->peer_map_
.bind (key
, peer_handler
);
248 template <class PH
, class PK
> int
249 Peer_Router
<PH
, PK
>::init (int argc
, ACE_TCHAR
*argv
[])
251 this->acceptor_
= new Acceptor_Factory
<PH
, PK
> (this);
253 if (this->acceptor_
->init (argc
, argv
) == -1
254 || this->peer_map_
.open () == -1)
259 ACE_UPIPE_Acceptor
&pa
= this->acceptor_
->acceptor ();
261 if (pa
.get_local_addr (addr
) != -1)
262 ACE_DEBUG ((LM_DEBUG
,
263 ACE_TEXT ("(%t) initializing %s, file = %s, fd = %d, this = %@\n"),
264 this->name (), addr
.get_path_name (), pa
.get_handle (), this));
266 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
267 ACE_TEXT ("get_local_addr")), -1);
277 #endif /* ACE_HAS_THREADS */
278 #endif /* _PEER_ROUTER_C */