1 #include "JAWS/Jaws_IO.h"
2 #include "JAWS/Pipeline_Tasks.h"
3 #include "JAWS/Pipeline_Handler_T.h"
4 #include "JAWS/Data_Block.h"
5 #include "JAWS/IO_Handler.h"
6 #include "JAWS/Policy.h"
9 JAWS_Pipeline_Handler::JAWS_Pipeline_Handler ()
14 JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler ()
19 JAWS_Pipeline_Handler::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
21 JAWS_Data_Block
*db
= dynamic_cast<JAWS_Data_Block
*> (mb
);
22 JAWS_IO_Handler
*ioh
= db
->io_handler ();
24 // guarantee the handler remains for the duration of this call
27 int status
= this->handle_put (db
, tv
);
29 if (status
!= -1 && status
!= 2)
31 JAWS_Pipeline_Handler
*task
= ioh
->task ();
32 JAWS_Pipeline_Handler
*next
33 = dynamic_cast<JAWS_Pipeline_Handler
*> (task
->next ());
43 JAWS_Dispatch_Policy
*
44 JAWS_Pipeline_Handler::policy ()
50 JAWS_Pipeline_Handler::policy (JAWS_Dispatch_Policy
*policy
)
52 this->policy_
= policy
;
56 JAWS_Pipeline_Accept_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
58 JAWS_Data_Block
*db
= dynamic_cast<JAWS_Data_Block
*> (mb
);
60 JAWS_Pipeline_Handler
*task
= db
->task ();
61 JAWS_Pipeline_Handler
*next
62 = dynamic_cast<JAWS_Pipeline_Handler
*> (task
->next ());
64 JAWS_IO_Handler
*ioh
= this->new_handler (db
);
67 ACE_ERROR ((LM_ERROR
, "%p\n", "JAWS_Pipeline_Accept_Task::put"));
76 int result
= this->handle_put (ioh
->message_block (), tv
);
84 JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block
*data
,
89 // JAWS_Data_Block should contain an INET_Addr and an IO
90 JAWS_IO_Handler
*handler
= data
->io_handler ();
91 JAWS_Dispatch_Policy
*policy
= this->policy ();
93 if (policy
== 0) policy
= data
->policy ();
95 // data->policy ()->update (handler);
97 JAWS_IO
*io
= policy
->io ();
100 // When accept returns, the resulting handle should be stored into
101 // the JAWS_DATA_BLOCK somewhere.
103 // Check the handler for status of the io call
104 switch (handler
->status ())
106 case JAWS_IO_Handler::ACCEPT_OK
:
108 ACE_DEBUG ((LM_DEBUG
, "(%t) ACCEPT_OK\n"));
110 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_OK");
111 // Move on to next stage in pipeline
114 case JAWS_IO_Handler::ACCEPT_ERROR
:
116 ACE_DEBUG ((LM_DEBUG
, "(%t) ACCEPT_ERROR\n"));
118 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_ERROR");
119 // Should recycle the thread
125 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_IDLE");
126 // Should mean that the IO is asynchronous, and the word isn't out
132 // In asynchronous and synchronous models, we can --
133 // have the io_handler set the new task in the data_block
135 // In asynchronous model, we can --
136 // insert a wait task into the task queue
138 ACE_DEBUG ((LM_DEBUG
, "(%t) Returning %d\n", result
));
143 JAWS_Pipeline_Accept_Task::new_handler (JAWS_Data_Block
*data
)
145 // Create a new handler and message block
146 JAWS_Data_Block
*ndb
= new JAWS_Data_Block (*data
);
149 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::new_handler, failed DB");
153 JAWS_Dispatch_Policy
*policy
=
154 (this->policy () == 0) ? data
->policy () : this->policy ();
155 JAWS_IO_Handler_Factory
*ioh_factory
= policy
->ioh_factory ();
157 JAWS_IO_Handler
*nioh
= ioh_factory
->create_io_handler ();
164 ndb
->io_handler (nioh
);
165 nioh
->task (data
->task ());
166 nioh
->message_block (ndb
);
172 JAWS_Pipeline_Done_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*)
174 JAWS_TRACE ("JAWS_Pipeline_Done_Task::put");
176 JAWS_Data_Block
*data
= dynamic_cast<JAWS_Data_Block
*> (mb
);
178 JAWS_IO_Handler
*handler
= data
->io_handler ();
179 JAWS_Dispatch_Policy
*policy
= this->policy ();
180 if (policy
== 0) policy
= data
->policy ();
182 // JAWS_IO *io = policy->io ();
185 data
->io_handler (0);
190 // hack, let Concurrency know we are done.
195 JAWS_Pipeline_Done_Task::handle_put (JAWS_Data_Block
*, ACE_Time_Value
*)