1 #include "JAWS/Jaws_IO.h"
2 #include "JAWS/Pipeline_Tasks.h"
3 #include "JAWS/Pipeline_Handler_T.h"
4 #include "JAWS/Data_Block.h"
5 #include "JAWS/IO_Handler.h"
6 #include "JAWS/Policy.h"
10 JAWS_Pipeline_Handler::JAWS_Pipeline_Handler (void)
15 JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler (void)
20 JAWS_Pipeline_Handler::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
22 JAWS_Data_Block
*db
= dynamic_cast<JAWS_Data_Block
*> (mb
);
23 JAWS_IO_Handler
*ioh
= db
->io_handler ();
25 // guarantee the handler remains for the duration of this call
28 int status
= this->handle_put (db
, tv
);
30 if (status
!= -1 && status
!= 2)
32 JAWS_Pipeline_Handler
*task
= ioh
->task ();
33 JAWS_Pipeline_Handler
*next
34 = dynamic_cast<JAWS_Pipeline_Handler
*> (task
->next ());
44 JAWS_Dispatch_Policy
*
45 JAWS_Pipeline_Handler::policy (void)
51 JAWS_Pipeline_Handler::policy (JAWS_Dispatch_Policy
*policy
)
53 this->policy_
= policy
;
57 JAWS_Pipeline_Accept_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
59 JAWS_Data_Block
*db
= dynamic_cast<JAWS_Data_Block
*> (mb
);
61 JAWS_Pipeline_Handler
*task
= db
->task ();
62 JAWS_Pipeline_Handler
*next
63 = dynamic_cast<JAWS_Pipeline_Handler
*> (task
->next ());
65 JAWS_IO_Handler
*ioh
= this->new_handler (db
);
68 ACE_ERROR ((LM_ERROR
, "%p\n", "JAWS_Pipeline_Accept_Task::put"));
77 int result
= this->handle_put (ioh
->message_block (), tv
);
85 JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block
*data
,
90 // JAWS_Data_Block should contain an INET_Addr and an IO
91 JAWS_IO_Handler
*handler
= data
->io_handler ();
92 JAWS_Dispatch_Policy
*policy
= this->policy ();
94 if (policy
== 0) policy
= data
->policy ();
96 // data->policy ()->update (handler);
98 JAWS_IO
*io
= policy
->io ();
101 // When accept returns, the resulting handle should be stored into
102 // the JAWS_DATA_BLOCK somewhere.
104 // Check the handler for status of the io call
105 switch (handler
->status ())
107 case JAWS_IO_Handler::ACCEPT_OK
:
109 ACE_DEBUG ((LM_DEBUG
, "(%t) ACCEPT_OK\n"));
111 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_OK");
112 // Move on to next stage in pipeline
115 case JAWS_IO_Handler::ACCEPT_ERROR
:
117 ACE_DEBUG ((LM_DEBUG
, "(%t) ACCEPT_ERROR\n"));
119 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_ERROR");
120 // Should recycle the thread
126 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_IDLE");
127 // Should mean that the IO is asynchronous, and the word isn't out
133 // In asynchronous and synchronous models, we can --
134 // have the io_handler set the new task in the data_block
136 // In asynchronous model, we can --
137 // insert a wait task into the task queue
139 ACE_DEBUG ((LM_DEBUG
, "(%t) Returning %d\n", result
));
144 JAWS_Pipeline_Accept_Task::new_handler (JAWS_Data_Block
*data
)
146 // Create a new handler and message block
147 JAWS_Data_Block
*ndb
= new JAWS_Data_Block (*data
);
150 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::new_handler, failed DB");
154 JAWS_Dispatch_Policy
*policy
=
155 (this->policy () == 0) ? data
->policy () : this->policy ();
156 JAWS_IO_Handler_Factory
*ioh_factory
= policy
->ioh_factory ();
158 JAWS_IO_Handler
*nioh
= ioh_factory
->create_io_handler ();
165 ndb
->io_handler (nioh
);
166 nioh
->task (data
->task ());
167 nioh
->message_block (ndb
);
173 JAWS_Pipeline_Done_Task::put (ACE_Message_Block
*mb
, ACE_Time_Value
*)
175 JAWS_TRACE ("JAWS_Pipeline_Done_Task::put");
177 JAWS_Data_Block
*data
= dynamic_cast<JAWS_Data_Block
*> (mb
);
179 JAWS_IO_Handler
*handler
= data
->io_handler ();
180 JAWS_Dispatch_Policy
*policy
= this->policy ();
181 if (policy
== 0) policy
= data
->policy ();
183 // JAWS_IO *io = policy->io ();
186 data
->io_handler (0);
191 // hack, let Concurrency know we are done.
196 JAWS_Pipeline_Done_Task::handle_put (JAWS_Data_Block
*, ACE_Time_Value
*)