Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / apps / JAWS2 / JAWS / Pipeline_Tasks.cpp
blob3f27630b8b03372226dd482d3bb798f985ceb34c
1 #include "JAWS/Jaws_IO.h"
2 #include "JAWS/Pipeline_Tasks.h"
3 #include "JAWS/Pipeline_Handler_T.h"
4 #include "JAWS/Data_Block.h"
5 #include "JAWS/IO_Handler.h"
6 #include "JAWS/Policy.h"
10 JAWS_Pipeline_Handler::JAWS_Pipeline_Handler (void)
11 : policy_ (0)
15 JAWS_Pipeline_Handler::~JAWS_Pipeline_Handler (void)
19 int
20 JAWS_Pipeline_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
22 JAWS_Data_Block *db = dynamic_cast<JAWS_Data_Block *> (mb);
23 JAWS_IO_Handler *ioh = db->io_handler ();
25 // guarantee the handler remains for the duration of this call
26 ioh->acquire ();
28 int status = this->handle_put (db, tv);
30 if (status != -1 && status != 2)
32 JAWS_Pipeline_Handler *task = ioh->task ();
33 JAWS_Pipeline_Handler *next
34 = dynamic_cast<JAWS_Pipeline_Handler *> (task->next ());
36 ioh->task (next);
39 ioh->release ();
41 return status;
44 JAWS_Dispatch_Policy *
45 JAWS_Pipeline_Handler::policy (void)
47 return this->policy_;
50 void
51 JAWS_Pipeline_Handler::policy (JAWS_Dispatch_Policy *policy)
53 this->policy_ = policy;
56 int
57 JAWS_Pipeline_Accept_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
59 JAWS_Data_Block *db = dynamic_cast<JAWS_Data_Block *> (mb);
61 JAWS_Pipeline_Handler *task = db->task ();
62 JAWS_Pipeline_Handler *next
63 = dynamic_cast<JAWS_Pipeline_Handler *> (task->next ());
65 JAWS_IO_Handler *ioh = this->new_handler (db);
66 if (ioh == 0)
68 ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Pipeline_Accept_Task::put"));
69 return -1;
72 ioh->acquire ();
74 ioh->task (next);
75 db->io_handler (ioh);
77 int result = this->handle_put (ioh->message_block (), tv);
79 ioh->release ();
81 return result;
84 int
85 JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data,
86 ACE_Time_Value *)
88 int result = -1;
90 // JAWS_Data_Block should contain an INET_Addr and an IO
91 JAWS_IO_Handler *handler = data->io_handler ();
92 JAWS_Dispatch_Policy *policy = this->policy ();
94 if (policy == 0) policy = data->policy ();
96 // data->policy ()->update (handler);
98 JAWS_IO *io = policy->io ();
99 io->accept (handler);
101 // When accept returns, the resulting handle should be stored into
102 // the JAWS_DATA_BLOCK somewhere.
104 // Check the handler for status of the io call
105 switch (handler->status ())
107 case JAWS_IO_Handler::ACCEPT_OK:
109 ACE_DEBUG ((LM_DEBUG, "(%t) ACCEPT_OK\n"));
110 result = 0;
111 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_OK");
112 // Move on to next stage in pipeline
113 break;
115 case JAWS_IO_Handler::ACCEPT_ERROR:
117 ACE_DEBUG ((LM_DEBUG, "(%t) ACCEPT_ERROR\n"));
118 result = -1;
119 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_ERROR");
120 // Should recycle the thread
121 break;
123 default:
125 result = 1;
126 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::handle_put ACCEPT_IDLE");
127 // Should mean that the IO is asynchronous, and the word isn't out
128 // yet.
129 break;
133 // In asynchronous and synchronous models, we can --
134 // have the io_handler set the new task in the data_block
136 // In asynchronous model, we can --
137 // insert a wait task into the task queue
139 ACE_DEBUG ((LM_DEBUG, "(%t) Returning %d\n", result));
140 return result;
143 JAWS_IO_Handler *
144 JAWS_Pipeline_Accept_Task::new_handler (JAWS_Data_Block *data)
146 // Create a new handler and message block
147 JAWS_Data_Block *ndb = new JAWS_Data_Block (*data);
148 if (ndb == 0)
150 JAWS_TRACE ("JAWS_Pipeline_Accept_Task::new_handler, failed DB");
151 return 0;
154 JAWS_Dispatch_Policy *policy =
155 (this->policy () == 0) ? data->policy () : this->policy ();
156 JAWS_IO_Handler_Factory *ioh_factory = policy->ioh_factory ();
158 JAWS_IO_Handler *nioh = ioh_factory->create_io_handler ();
159 if (nioh == 0)
161 delete ndb;
162 return 0;
165 ndb->io_handler (nioh);
166 nioh->task (data->task ());
167 nioh->message_block (ndb);
169 return nioh;
173 JAWS_Pipeline_Done_Task::put (ACE_Message_Block *mb, ACE_Time_Value *)
175 JAWS_TRACE ("JAWS_Pipeline_Done_Task::put");
177 JAWS_Data_Block *data = dynamic_cast<JAWS_Data_Block *> (mb);
179 JAWS_IO_Handler *handler = data->io_handler ();
180 JAWS_Dispatch_Policy *policy = this->policy ();
181 if (policy == 0) policy = data->policy ();
183 // JAWS_IO *io = policy->io ();
185 data->task (0);
186 data->io_handler (0);
188 if (handler)
189 handler->done ();
191 // hack, let Concurrency know we are done.
192 return -2;
196 JAWS_Pipeline_Done_Task::handle_put (JAWS_Data_Block *, ACE_Time_Value *)
198 return 0;